package org.apache.flink.table.runtime.operator.bundle;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state2.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state2.partitioned.PartitionedListStateDescriptor;
import org.apache.flink.streaming.api.bundle.BundleTrigger;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.graph.OperatorContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.functions.bundle.BundleFunction;
import org.apache.flink.table.runtime.operator.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.LockAndCondition;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operator/bundle/BundleOperator.class */
public class BundleOperator<K, V, IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
    private static final long serialVersionUID = 5081841938324118594L;
    private final boolean finishBundleBeforeSnapshot;
    private final BundleTrigger<IN> bundleTrigger;
    private final BundleFunction<K, V, IN, OUT> function;
    private final TypeInformation<V> valueType;
    private final TypeInformation<K> keyType;
    private final KeySelector<IN, K> keySelector;
    private transient Map<K, V> buffer;
    private transient ListState<Tuple2<K, V>> bufferState;
    private transient LockAndCondition checkpointingLock;
    private transient Collector<OUT> collector;
    private transient int numOfElements = 0;
    private volatile transient boolean isInFinishingBundle = false;
    static final /* synthetic */ boolean $assertionsDisabled;

    public BundleOperator(BundleFunction<K, V, IN, OUT> bundleFunction, BundleTrigger<IN> bundleTrigger, TypeInformation<K> typeInformation, TypeInformation<V> typeInformation2, KeySelector<IN, K> keySelector, boolean z) {
        this.finishBundleBeforeSnapshot = z;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.function = (BundleFunction) Preconditions.checkNotNull(bundleFunction, "function is null");
        this.bundleTrigger = (BundleTrigger) Preconditions.checkNotNull(bundleTrigger, "bundleTrigger is null");
        this.keyType = (TypeInformation) Preconditions.checkNotNull(typeInformation, "key type is null");
        this.valueType = (TypeInformation) Preconditions.checkNotNull(typeInformation2, "value type is null");
        this.keySelector = (KeySelector) Preconditions.checkNotNull(keySelector, "key selector is null");
    }

    public void setup(StreamTask<?, ?> streamTask, OperatorContext operatorContext, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, operatorContext, output);
        this.checkpointingLock = getContainingTask().getCheckpointLock();
    }

    public void open() throws Exception {
        super.open();
        this.function.open(new ExecutionContextImpl(this, getRuntimeContext()));
        this.collector = new StreamRecordCollector(this.output);
        this.buffer = new HashMap();
        if (!this.finishBundleBeforeSnapshot) {
            this.bufferState = getPartitionedState(new PartitionedListStateDescriptor("localBufferState", new TupleTypeInfo(new TypeInformation[]{this.keyType, this.valueType}).createSerializer(getExecutionConfig())));
            recoverBundleBuffer();
        }
        this.bundleTrigger.registerBundleTriggerCallback(this, () -> {
            return super.getProcessingTimeService();
        });
        this.bundleTrigger.reset();
        getRuntimeContext().getMetricGroup().gauge("bundleSize", () -> {
            return Integer.valueOf(this.numOfElements);
        });
        getRuntimeContext().getMetricGroup().gauge("bundleRatio", () -> {
            int size = this.buffer.size();
            return size == 0 ? Double.valueOf(0.0d) : Double.valueOf((1.0d * this.numOfElements) / size);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        Object key = this.keySelector.getKey(streamRecord.getValue());
        this.buffer.put(key, this.function.addInput(this.buffer.get(key), streamRecord.getValue()));
        this.numOfElements++;
        this.bundleTrigger.onElement(streamRecord.getValue());
    }

    public void finishBundle() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        this.isInFinishingBundle = true;
        if (!$assertionsDisabled && !this.checkpointingLock.getLock().isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        if (!this.buffer.isEmpty()) {
            this.numOfElements = 0;
            this.function.finishBundle(this.buffer, this.collector);
            this.buffer.clear();
        }
        this.bundleTrigger.reset();
        this.checkpointingLock.getCondition().signalAll();
        this.isInFinishingBundle = false;
    }

    public void processWatermark(Watermark watermark) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        finishBundle();
        super.processWatermark(watermark);
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        if (this.finishBundleBeforeSnapshot) {
            finishBundle();
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        super.snapshotState(stateSnapshotContext);
        if (this.finishBundleBeforeSnapshot) {
            return;
        }
        this.bufferState.clear();
        ArrayList arrayList = new ArrayList(this.buffer.size());
        for (Map.Entry<K, V> entry : this.buffer.entrySet()) {
            arrayList.add(Tuple2.of(entry.getKey(), entry.getValue()));
        }
        this.bufferState.addAll(arrayList);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void recoverBundleBuffer() throws Exception {
        Iterator it = this.bufferState.iterator();
        while (it.hasNext()) {
            Tuple2 tuple2 = (Tuple2) it.next();
            Object obj = tuple2.f0;
            this.buffer.put(obj, this.function.mergeValue(this.buffer.get(obj), tuple2.f1));
            this.numOfElements++;
        }
        this.bufferState.clear();
    }

    public void close() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        if (!$assertionsDisabled && !this.checkpointingLock.getLock().isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        try {
            finishBundle();
            this.function.endInput(this.collector);
            Exception exc = null;
            try {
                super.close();
                if (this.function != null) {
                    FunctionUtils.closeFunction(this.function);
                }
            } catch (InterruptedException e) {
                exc = e;
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                exc = e2;
            }
            if (exc != null) {
                LOG.warn("Errors occurred while closing the BundleOperator.", exc);
            }
        } catch (Throwable th) {
            Exception exc2 = null;
            try {
                super.close();
                if (this.function != null) {
                    FunctionUtils.closeFunction(this.function);
                }
            } catch (InterruptedException e3) {
                exc2 = e3;
                Thread.currentThread().interrupt();
            } catch (Exception e4) {
                exc2 = e4;
            }
            if (exc2 != null) {
                LOG.warn("Errors occurred while closing the BundleOperator.", exc2);
            }
            throw th;
        }
    }

    public boolean requireState() {
        return !this.finishBundleBeforeSnapshot;
    }

    static {
        $assertionsDisabled = !BundleOperator.class.desiredAssertionStatus();
    }
}
