package org.apache.flink.table.runtime.bundle;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

/* loaded from: input_file:org/apache/flink/table/runtime/bundle/KeyedCoBundleOperator.class */
public abstract class KeyedCoBundleOperator extends TriggerableOperator<BaseRow, Byte, BaseRow> implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow>, BundleTriggerCallback {
    private static final String LEFT_STATE_NAME = "_keyed_co_bundle_operator_left_state_";
    private static final String RIGHT_STATE_NAME = "_keyed_co_bundle_operator_right_state_";
    private final CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger;
    private final boolean finishBundleBeforeSnapshot;
    private transient CheckpointLockDelegate lockDelegate;
    private transient StreamRecordCollector<BaseRow> collector;
    private transient Map<BaseRow, List<BaseRow>> leftBuffer;
    private transient Map<BaseRow, List<BaseRow>> rightBuffer;
    private transient KeyedValueState<BaseRow, List<BaseRow>> leftBufferState;
    private transient KeyedValueState<BaseRow, List<BaseRow>> rightBufferState;
    private TypeSerializer<BaseRow> lTypeSerializer;
    private TypeSerializer<BaseRow> rTypeSerializer;
    private AbstractRowSerializer<BaseRow> inputSer1;
    private AbstractRowSerializer<BaseRow> inputSer2;
    private long input1Watermark = Long.MIN_VALUE;
    private long input2Watermark = Long.MIN_VALUE;
    private long currentWatermark = Long.MIN_VALUE;
    private volatile transient boolean isInFinishingBundle = false;

    public KeyedCoBundleOperator(CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger, boolean z) {
        this.finishBundleBeforeSnapshot = z;
        Preconditions.checkNotNull(coBundleTrigger, "coBundleTrigger is null");
        this.coBundleTrigger = coBundleTrigger;
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        BaseRow baseRow = (BaseRow) getCurrentKey();
        BaseRow baseRow2 = (BaseRow) this.inputSer1.copy(streamRecord.getValue());
        this.leftBuffer.computeIfAbsent(baseRow, baseRow3 -> {
            return new ArrayList();
        }).add(baseRow2);
        this.coBundleTrigger.onLeftElement(baseRow2);
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        BaseRow baseRow = (BaseRow) getCurrentKey();
        BaseRow baseRow2 = (BaseRow) this.inputSer2.copy(streamRecord.getValue());
        this.rightBuffer.computeIfAbsent(baseRow, baseRow3 -> {
            return new ArrayList();
        }).add(baseRow2);
        this.coBundleTrigger.onRightElement(baseRow2);
        return TwoInputSelection.ANY;
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        this.input1Watermark = watermark.getTimestamp();
        long min = Math.min(this.input1Watermark, this.input2Watermark);
        if (min > this.currentWatermark) {
            this.currentWatermark = min;
            try {
                finishBundle();
                this.output.emitWatermark(new Watermark(min));
            } catch (Throwable th) {
                this.output.emitWatermark(new Watermark(min));
                throw th;
            }
        }
    }

    public void processWatermark2(Watermark watermark) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        this.input2Watermark = watermark.getTimestamp();
        long min = Math.min(this.input1Watermark, this.input2Watermark);
        if (min > this.currentWatermark) {
            this.currentWatermark = min;
            try {
                finishBundle();
                this.output.emitWatermark(new Watermark(min));
            } catch (Throwable th) {
                this.output.emitWatermark(new Watermark(min));
                throw th;
            }
        }
    }

    public void endInput1() throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        finishBundle();
    }

    public void endInput2() throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        finishBundle();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.finishBundleBeforeSnapshot) {
            finishBundle();
        }
    }

    public void finishBundle() throws Exception {
        this.lockDelegate.lockAndRun(() -> {
            while (this.isInFinishingBundle) {
                this.lockDelegate.await();
            }
            this.isInFinishingBundle = true;
            try {
                if (!this.leftBuffer.isEmpty() || !this.rightBuffer.isEmpty()) {
                    processBundles(this.leftBuffer, this.rightBuffer, this.collector);
                }
            } finally {
                this.leftBuffer.clear();
                this.rightBuffer.clear();
                this.coBundleTrigger.reset();
                this.lockDelegate.signalAll();
                this.isInFinishingBundle = false;
            }
        });
    }

    protected abstract void processBundles(Map<BaseRow, List<BaseRow>> map, Map<BaseRow, List<BaseRow>> map2, Collector<BaseRow> collector) throws Exception;

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<BaseRow>> output) {
        super.setup(streamTask, streamConfig, output);
        this.lockDelegate = new CheckpointLockDelegate(getContainingTask().getCheckpointLock());
    }

    @VisibleForTesting
    public void setupTypeSerializer(TypeSerializer<BaseRow> typeSerializer, TypeSerializer<BaseRow> typeSerializer2) {
        Objects.requireNonNull(typeSerializer);
        Objects.requireNonNull(typeSerializer2);
        this.lTypeSerializer = typeSerializer;
        this.rTypeSerializer = typeSerializer2;
    }

    @Override // org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator
    public void open() throws Exception {
        super.open();
        this.collector = new StreamRecordCollector<>(this.output);
        this.leftBuffer = new HashMap();
        this.rightBuffer = new HashMap();
        this.inputSer1 = (AbstractRowSerializer) (this.lTypeSerializer == null ? this.config.getTypeSerializerIn1(getUserCodeClassloader()) : this.lTypeSerializer);
        this.inputSer2 = (AbstractRowSerializer) (this.rTypeSerializer == null ? this.config.getTypeSerializerIn2(getUserCodeClassloader()) : this.rTypeSerializer);
        if (!this.finishBundleBeforeSnapshot) {
            this.leftBufferState = getKeyedState(new KeyedValueStateDescriptor(LEFT_STATE_NAME, getKeySerializer(), new ListSerializer(this.inputSer1)));
            this.leftBuffer.putAll(this.leftBufferState.getAll());
            this.leftBufferState.removeAll();
            this.rightBufferState = getKeyedState(new KeyedValueStateDescriptor(RIGHT_STATE_NAME, getKeySerializer(), new ListSerializer(this.inputSer2)));
        }
        this.coBundleTrigger.registerBundleTriggerCallback(this);
        this.coBundleTrigger.reset();
        LOG.info("KeyedCoBundleOperator's trigger info: " + this.coBundleTrigger.explain());
    }

    public void close() throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        try {
            finishBundle();
            Exception exc = null;
            try {
                super.close();
            } catch (InterruptedException e) {
                exc = e;
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                exc = e2;
            }
            if (exc != null) {
                LOG.warn("Errors occurred while closing the KeyedCoBundleOperator.", exc);
            }
        } catch (Throwable th) {
            Exception exc2 = null;
            try {
                super.close();
            } catch (InterruptedException e3) {
                exc2 = e3;
                Thread.currentThread().interrupt();
            } catch (Exception e4) {
                exc2 = e4;
            }
            if (exc2 != null) {
                LOG.warn("Errors occurred while closing the KeyedCoBundleOperator.", exc2);
            }
            throw th;
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        while (this.isInFinishingBundle) {
            this.lockDelegate.await();
        }
        super.snapshotState(stateSnapshotContext);
        if (this.finishBundleBeforeSnapshot) {
            return;
        }
        this.leftBufferState.removeAll();
        this.rightBufferState.removeAll();
        this.leftBufferState.putAll(this.leftBuffer);
        this.rightBufferState.putAll(this.rightBuffer);
    }

    public boolean requireState() {
        return true;
    }
}
