package org.apache.flink.table.runtime.operator.bundle;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state2.keyed.KeyedValueState;
import org.apache.flink.runtime.state2.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.graph.OperatorContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InputElementSelection;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.operator.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.LockAndCondition;
import org.apache.flink.util.LockGetReleaseWrapper;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operator/bundle/KeyedCoBundleOperator.class */
public abstract class KeyedCoBundleOperator<K, L, R, OUT> extends AbstractStreamOperator<OUT> implements TwoInputStreamOperator<L, R, OUT>, BundleTriggerCallback {
    private static final String LEFT_STATE_NAME = "_keyed_co_bundle_operator_left_state_";
    private static final String RIGHT_STATE_NAME = "_keyed_co_bundle_operator_right_state_";
    private final boolean finishBundleBeforeSnapshot;
    private final CoBundleTrigger<L, R> coBundleTrigger;
    private transient LockAndCondition checkpointingLock;
    private transient StreamRecordCollector<OUT> collector;
    private transient Map<K, List<L>> leftBuffer;
    private transient Map<K, List<R>> rightBuffer;
    private transient KeyedValueState<K, List<L>> leftBufferState;
    private transient KeyedValueState<K, List<R>> rightBufferState;
    private long input1Watermark = Long.MIN_VALUE;
    private long input2Watermark = Long.MIN_VALUE;
    private long currentWatermark = Long.MIN_VALUE;
    private volatile transient boolean isInFinishingBundle = false;

    public KeyedCoBundleOperator(CoBundleTrigger<L, R> coBundleTrigger, boolean z) {
        this.finishBundleBeforeSnapshot = z;
        Preconditions.checkNotNull(coBundleTrigger, "coBundleTrigger is null");
        this.coBundleTrigger = coBundleTrigger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public InputElementSelection processElement1(StreamRecord<L> streamRecord) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        Object currentKey = getCurrentKey();
        Object value = streamRecord.getValue();
        ((List) this.leftBuffer.computeIfAbsent(currentKey, obj -> {
            return new ArrayList();
        })).add(value);
        this.coBundleTrigger.onLeftElement(value);
        return InputElementSelection.ANY;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public InputElementSelection processElement2(StreamRecord<R> streamRecord) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        Object currentKey = getCurrentKey();
        Object value = streamRecord.getValue();
        ((List) this.rightBuffer.computeIfAbsent(currentKey, obj -> {
            return new ArrayList();
        })).add(value);
        this.coBundleTrigger.onRightElement(value);
        return InputElementSelection.ANY;
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        this.input1Watermark = watermark.getTimestamp();
        long min = Math.min(this.input1Watermark, this.input2Watermark);
        if (min > this.currentWatermark) {
            this.currentWatermark = min;
            finishBundle();
            this.output.emitWatermark(new Watermark(min));
        }
    }

    public void processWatermark2(Watermark watermark) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        this.input2Watermark = watermark.getTimestamp();
        long min = Math.min(this.input1Watermark, this.input2Watermark);
        if (min > this.currentWatermark) {
            this.currentWatermark = min;
            finishBundle();
            this.output.emitWatermark(new Watermark(min));
        }
    }

    public InputElementSelection endInput1() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        finishBundle();
        return InputElementSelection.ANY;
    }

    public InputElementSelection endInput2() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        finishBundle();
        return InputElementSelection.ANY;
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.finishBundleBeforeSnapshot) {
            finishBundle();
        }
    }

    public void finishBundle() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        this.isInFinishingBundle = true;
        LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.checkpointingLock.getLock());
        Throwable th = null;
        try {
            if (!this.leftBuffer.isEmpty() || !this.rightBuffer.isEmpty()) {
                processBundles(this.leftBuffer, this.rightBuffer, this.collector);
                this.leftBuffer.clear();
                this.rightBuffer.clear();
            }
            this.coBundleTrigger.reset();
            if (lockGetReleaseWrapper != null) {
                if (0 != 0) {
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    lockGetReleaseWrapper.close();
                }
            }
            this.checkpointingLock.getCondition().signalAll();
            this.isInFinishingBundle = false;
        } catch (Throwable th3) {
            if (lockGetReleaseWrapper != null) {
                if (0 != 0) {
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockGetReleaseWrapper.close();
                }
            }
            throw th3;
        }
    }

    protected abstract void processBundles(Map<K, List<L>> map, Map<K, List<R>> map2, Collector<OUT> collector) throws Exception;

    public void setup(StreamTask<?, ?> streamTask, OperatorContext operatorContext, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, operatorContext, output);
        this.checkpointingLock = getContainingTask().getCheckpointLock();
    }

    public void open() throws Exception {
        super.open();
        this.checkpointingLock = getContainingTask().getCheckpointLock();
        this.collector = new StreamRecordCollector<>(this.output);
        this.leftBuffer = new HashMap();
        this.rightBuffer = new HashMap();
        if (!this.finishBundleBeforeSnapshot) {
            this.leftBufferState = getKeyedState(new KeyedValueStateDescriptor(LEFT_STATE_NAME, getKeySerializer(), new ListSerializer(this.config.getTypeSerializerIn1())));
            this.leftBuffer.putAll(this.leftBufferState.getAll());
            this.leftBufferState.removeAll();
            this.rightBufferState = getKeyedState(new KeyedValueStateDescriptor(RIGHT_STATE_NAME, getKeySerializer(), new ListSerializer(this.config.getTypeSerializerIn2())));
            this.rightBuffer.putAll(this.rightBufferState.getAll());
            this.rightBufferState.removeAll();
        }
        this.coBundleTrigger.registerBundleTriggerCallback(this, () -> {
            return super.getProcessingTimeService();
        });
        this.coBundleTrigger.reset();
        LOG.info("KeyedCoBundleOperator's trigger info: " + this.coBundleTrigger.explain());
    }

    public void close() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        try {
            finishBundle();
            Exception exc = null;
            try {
                super.close();
            } catch (InterruptedException e) {
                exc = e;
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                exc = e2;
            }
            if (exc != null) {
                LOG.warn("Errors occurred while closing the KeyedCoBundleOperator.", exc);
            }
        } catch (Throwable th) {
            Exception exc2 = null;
            try {
                super.close();
            } catch (InterruptedException e3) {
                exc2 = e3;
                Thread.currentThread().interrupt();
            } catch (Exception e4) {
                exc2 = e4;
            }
            if (exc2 != null) {
                LOG.warn("Errors occurred while closing the KeyedCoBundleOperator.", exc2);
            }
            throw th;
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.getCondition().await();
        }
        super.snapshotState(stateSnapshotContext);
        if (this.finishBundleBeforeSnapshot) {
            return;
        }
        this.leftBufferState.removeAll();
        this.rightBufferState.removeAll();
        this.leftBufferState.putAll(this.leftBuffer);
        this.rightBufferState.putAll(this.rightBuffer);
    }

    public boolean requireState() {
        return true;
    }
}
