/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.bundle;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.bundle.BundleTrigger;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.functions.BundleFunction;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class BundleOperator<K, V, IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
BundleTriggerCallback {
    private static final long serialVersionUID = 5081841938324118594L;
    private static final String STATE_NAME = "_bundle_operator_state_";
    private final boolean finishBundleBeforeSnapshot;
    private final BundleTrigger<IN> bundleTrigger;
    private final BundleFunction<K, V, IN, OUT> function;
    private final TypeInformation<V> valueType;
    private final TypeInformation<K> keyType;
    private final KeySelector<IN, K> keySelector;
    private transient Map<K, V> buffer;
    private transient ListState<Tuple2<K, V>> bufferState;
    private transient Object checkpointingLock;
    private transient Collector<OUT> collector;
    private transient int numOfElements = 0;
    private volatile transient boolean isInFinishingBundle = false;

    public BundleOperator(BundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger, TypeInformation<K> keyType, TypeInformation<V> valueType, KeySelector<IN, K> keySelector, boolean finishBundleBeforeSnapshot) {
        this.finishBundleBeforeSnapshot = finishBundleBeforeSnapshot;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.function = Preconditions.checkNotNull(function, "function is null");
        this.bundleTrigger = Preconditions.checkNotNull(bundleTrigger, "bundleTrigger is null");
        this.keyType = Preconditions.checkNotNull(keyType, "key type is null");
        this.valueType = Preconditions.checkNotNull(valueType, "value type is null");
        this.keySelector = Preconditions.checkNotNull(keySelector, "key selector is null");
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output2) {
        super.setup(containingTask, config, output2);
        this.checkpointingLock = this.getContainingTask().getCheckpointLock();
    }

    public void open() throws Exception {
        super.open();
        this.function.open(new ExecutionContextImpl(this, (RuntimeContext)this.getRuntimeContext()));
        this.collector = new StreamRecordCollector<OUT>(this.output);
        this.buffer = new HashMap();
        if (!this.finishBundleBeforeSnapshot && this.bufferState != null) {
            for (Tuple2 tuple : (Iterable)this.bufferState.get()) {
                Object key = tuple.f0;
                Object value = tuple.f1;
                V prevValue = this.buffer.get(key);
                V newValue = this.function.mergeValue(prevValue, value);
                this.buffer.put(key, newValue);
                ++this.numOfElements;
            }
            this.bufferState = null;
        }
        this.bundleTrigger.registerBundleTriggerCallback((BundleTriggerCallback)this);
        this.bundleTrigger.reset();
        this.getRuntimeContext().getMetricGroup().gauge("bundleSize", () -> this.numOfElements);
        this.getRuntimeContext().getMetricGroup().gauge("bundleRatio", () -> {
            int numOfKeys = this.buffer.size();
            if (numOfKeys == 0) {
                return 0.0;
            }
            return 1.0 * (double)this.numOfElements / (double)numOfKeys;
        });
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        K key = this.keySelector.getKey(element.getValue());
        V value = this.buffer.get(key);
        V newValue = this.function.addInput(value, element.getValue());
        this.buffer.put(key, newValue);
        ++this.numOfElements;
        this.bundleTrigger.onElement(element.getValue());
    }

    public void finishBundle() throws Exception {
        assert (Thread.holdsLock(this.checkpointingLock));
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        this.isInFinishingBundle = true;
        if (!this.buffer.isEmpty()) {
            this.numOfElements = 0;
            this.function.finishBundle(this.buffer, this.collector);
            this.buffer.clear();
        }
        this.bundleTrigger.reset();
        this.checkpointingLock.notifyAll();
        this.isInFinishingBundle = false;
    }

    public void processWatermark(Watermark mark) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        this.finishBundle();
        super.processWatermark(mark);
    }

    public void endInput() throws Exception {
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        TupleTypeInfo tupleType = new TupleTypeInfo(this.keyType, this.valueType);
        this.bufferState = context.getOperatorStateStore().getListState(new ListStateDescriptor(STATE_NAME, tupleType));
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        if (this.finishBundleBeforeSnapshot) {
            this.finishBundle();
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        super.snapshotState(context);
        if (!this.finishBundleBeforeSnapshot) {
            TupleTypeInfo tupleType = new TupleTypeInfo(this.keyType, this.valueType);
            ListState bufferState = this.getOperatorStateBackend().getListState(new ListStateDescriptor(STATE_NAME, tupleType));
            bufferState.clear();
            Iterator<Map.Entry<K, V>> iter = this.buffer.entrySet().iterator();
            ArrayList<Tuple2<K, V>> stateToPut = new ArrayList<Tuple2<K, V>>(this.buffer.size());
            while (iter.hasNext()) {
                Map.Entry<K, V> entry = iter.next();
                K key = entry.getKey();
                V value = entry.getValue();
                stateToPut.add(Tuple2.of(key, value));
            }
            bufferState.addAll(stateToPut);
        }
    }

    public void close() throws Exception {
        assert (Thread.holdsLock(this.checkpointingLock));
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        try {
            this.finishBundle();
            this.function.endInput(this.collector);
        }
        finally {
            Exception exception = null;
            try {
                super.close();
                if (this.function != null) {
                    FunctionUtils.closeFunction(this.function);
                }
            }
            catch (InterruptedException interrupted) {
                exception = interrupted;
                Thread.currentThread().interrupt();
            }
            catch (Exception e2) {
                exception = e2;
            }
            if (exception != null) {
                LOG.warn("Errors occurred while closing the BundleOperator.", (Throwable)exception);
            }
        }
    }

    public boolean requireState() {
        return !this.finishBundleBeforeSnapshot;
    }
}

