/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.bundle;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.bundle.BundleTrigger;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.functions.BundleFunction;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

public class KeyedBundleOperator<K, V, IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
BundleTriggerCallback {
    private static final long serialVersionUID = 5081841938324118594L;
    private final boolean finishBundleBeforeSnapshot;
    private final BundleTrigger<IN> bundleTrigger;
    private final BundleFunction<K, V, IN, OUT> function;
    private final TypeInformation<V> valueType;
    private transient CheckpointLockDelegate lockDelegate;
    private transient Map<K, V> buffer;
    private transient Collector<OUT> collector;
    private transient KeyedValueState<K, V> bufferState;
    private transient int numOfElements = 0;
    private volatile transient boolean isInFinishingBundle = false;

    public KeyedBundleOperator(BundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger, TypeInformation<V> valueType, boolean finishBundleBeforeSnapshot) {
        this.finishBundleBeforeSnapshot = finishBundleBeforeSnapshot;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.function = function;
        this.bundleTrigger = Preconditions.checkNotNull(bundleTrigger, "bundleTrigger is null");
        this.valueType = Preconditions.checkNotNull(valueType, "valueType is null");
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output2) {
        super.setup(containingTask, config, output2);
        this.lockDelegate = new CheckpointLockDelegate(this.getContainingTask().getCheckpointLock());
    }

    public void open() throws Exception {
        super.open();
        this.function.open(new ExecutionContextImpl(this, (RuntimeContext)this.getRuntimeContext()));
        this.numOfElements = 0;
        this.collector = new StreamRecordCollector<OUT>(this.output);
        this.buffer = new HashMap();
        if (!this.finishBundleBeforeSnapshot) {
            TypeSerializer<V> valueSer = this.valueType.createSerializer(this.getExecutionConfig());
            KeyedValueStateDescriptor bufferStateDesc = new KeyedValueStateDescriptor("globalBufferState", this.getKeySerializer(), valueSer);
            this.bufferState = (KeyedValueState)this.getKeyedState((KeyedStateDescriptor)bufferStateDesc);
            this.buffer.putAll(this.bufferState.getAll());
            this.bufferState.removeAll();
        }
        this.numOfElements = this.buffer.size();
        this.bundleTrigger.registerBundleTriggerCallback((BundleTriggerCallback)this);
        this.bundleTrigger.reset();
        LOG.info("KeyedBundleOperator's trigger info: " + this.bundleTrigger.explain());
        this.getRuntimeContext().getMetricGroup().gauge("bundleSize", () -> this.numOfElements);
        this.getRuntimeContext().getMetricGroup().gauge("bundleRatio", () -> {
            int numOfKeys = this.buffer.size();
            if (numOfKeys == 0) {
                return 0.0;
            }
            return 1.0 * (double)this.numOfElements / (double)numOfKeys;
        });
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        Object key = this.getCurrentKey();
        V value = this.buffer.get(key);
        V newValue = this.function.addInput(value, element.getValue());
        this.buffer.put(key, newValue);
        ++this.numOfElements;
        this.bundleTrigger.onElement(element.getValue());
    }

    public void finishBundle() throws Exception {
        assert (this.lockDelegate.isHeldByCurrentThread());
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        this.isInFinishingBundle = true;
        try {
            if (!this.buffer.isEmpty()) {
                this.numOfElements = 0;
                this.function.finishBundle(this.buffer, this.collector);
            }
        }
        finally {
            this.buffer.clear();
            this.bundleTrigger.reset();
            this.lockDelegate.signalAll();
            this.isInFinishingBundle = false;
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        try {
            this.finishBundle();
        }
        finally {
            super.processWatermark(mark);
        }
    }

    public void endInput() throws Exception {
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        if (this.finishBundleBeforeSnapshot) {
            this.finishBundle();
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        super.snapshotState(context);
        if (!this.finishBundleBeforeSnapshot) {
            this.bufferState.removeAll();
            this.bufferState.putAll(this.buffer);
        }
    }

    public void close() throws Exception {
        assert (this.lockDelegate.isHeldByCurrentThread());
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        try {
            this.finishBundle();
            this.function.endInput(this.collector);
        }
        finally {
            Exception exception = null;
            try {
                super.close();
                if (this.function != null) {
                    this.function.close();
                }
            }
            catch (InterruptedException interrupted) {
                exception = interrupted;
                Thread.currentThread().interrupt();
            }
            catch (Exception e2) {
                exception = e2;
            }
            if (exception != null) {
                LOG.warn("Errors occurred while closing the BundleOperator.", (Throwable)exception);
            }
        }
    }

    public boolean requireState() {
        return true;
    }
}

