/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.context;

import java.util.Collection;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.context.ContextSubKeyedAppendingState;
import org.apache.flink.runtime.state.heap.KeyContextImpl;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.util.Preconditions;

public class ContextSubKeyedAggregatingState<K, N, IN, ACC, OUT>
implements ContextSubKeyedAppendingState<K, N, IN, ACC, OUT>,
InternalAggregatingState<K, N, IN, ACC, OUT> {
    private N namespace;
    private final KeyContextImpl<K> operator;
    private final SubKeyedValueState<Object, N, ACC> subKeyedValueState;
    private final AggregateTransformation aggregateTransformation;
    private final MergeTransformation mergeTransformation;

    public ContextSubKeyedAggregatingState(KeyContextImpl<K> operator, SubKeyedValueState<Object, N, ACC> subKeyedValueState, AggregateFunction<IN, ACC, OUT> aggregateFunction) {
        Preconditions.checkNotNull(operator);
        Preconditions.checkNotNull(subKeyedValueState);
        Preconditions.checkNotNull(aggregateFunction);
        this.operator = operator;
        this.subKeyedValueState = subKeyedValueState;
        this.aggregateTransformation = new AggregateTransformation(aggregateFunction);
        this.mergeTransformation = new MergeTransformation(aggregateFunction);
    }

    public OUT get() {
        Object accumulator = this.subKeyedValueState.get(this.operator.getCurrentKey(), this.namespace);
        return (OUT)(accumulator == null ? null : this.aggregateTransformation.aggregateFunction.getResult(accumulator));
    }

    public void add(IN value) {
        this.subKeyedValueState.transform(this.operator.getCurrentKey(), this.namespace, value, this.aggregateTransformation);
    }

    public void clear() {
        this.subKeyedValueState.remove(this.operator.getCurrentKey(), this.namespace);
    }

    @Override
    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        K currentKey = this.operator.getCurrentKey();
        Object merged = null;
        for (N source : sources) {
            ACC sourceState = this.subKeyedValueState.getAndRemove(currentKey, source);
            if (merged != null && sourceState != null) {
                merged = this.mergeTransformation.aggregateFunction.merge(merged, sourceState);
                continue;
            }
            if (merged != null) continue;
            merged = sourceState;
        }
        if (merged != null) {
            this.subKeyedValueState.transform(currentKey, target, merged, this.mergeTransformation);
        }
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.operator.getKeySerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return this.subKeyedValueState.getDescriptor().getNamespaceSerializer();
    }

    @Override
    public TypeSerializer<ACC> getValueSerializer() {
        return this.subKeyedValueState.getDescriptor().getValueSerializer();
    }

    @Override
    public void setCurrentNamespace(N namespace) {
        this.namespace = namespace;
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer safeKeySerializer, TypeSerializer safeNamespaceSerializer, TypeSerializer safeValueSerializer) throws Exception {
        return this.subKeyedValueState.getSerializedValue(serializedKeyAndNamespace, (TypeSerializer<Object>)safeKeySerializer, safeNamespaceSerializer, safeValueSerializer);
    }

    @Override
    public SubKeyedState<K, N, ACC> getSubKeyedState() {
        return this.subKeyedValueState;
    }

    private class MergeTransformation
    implements StateTransformationFunction<ACC, ACC> {
        private final AggregateFunction<IN, ACC, OUT> aggregateFunction;

        public MergeTransformation(AggregateFunction<IN, ACC, OUT> aggregateFunction) {
            this.aggregateFunction = (AggregateFunction)Preconditions.checkNotNull(aggregateFunction);
        }

        @Override
        public ACC apply(ACC v1, ACC v2) {
            return v1 == null ? v2 : this.aggregateFunction.merge(v1, v2);
        }
    }

    private class AggregateTransformation
    implements StateTransformationFunction<ACC, IN> {
        private final AggregateFunction<IN, ACC, OUT> aggregateFunction;

        public AggregateTransformation(AggregateFunction<IN, ACC, OUT> aggregateFunction) {
            this.aggregateFunction = (AggregateFunction)Preconditions.checkNotNull(aggregateFunction);
        }

        @Override
        public ACC apply(ACC accumulator, IN value) {
            if (accumulator == null) {
                accumulator = this.aggregateFunction.createAccumulator();
            }
            return this.aggregateFunction.add(value, accumulator);
        }
    }
}

