/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.bundle;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public abstract class KeyedCoBundleOperator
extends AbstractStreamOperator<BaseRow>
implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow>,
BundleTriggerCallback {
    private static final String LEFT_STATE_NAME = "_keyed_co_bundle_operator_left_state_";
    private static final String RIGHT_STATE_NAME = "_keyed_co_bundle_operator_right_state_";
    private final CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger;
    private final boolean finishBundleBeforeSnapshot;
    private transient Object checkpointingLock;
    private transient StreamRecordCollector<BaseRow> collector;
    private transient Map<BaseRow, List<BaseRow>> leftBuffer;
    private transient Map<BaseRow, List<BaseRow>> rightBuffer;
    private transient KeyedValueState<BaseRow, List<BaseRow>> leftBufferState;
    private transient KeyedValueState<BaseRow, List<BaseRow>> rightBufferState;
    private long input1Watermark = Long.MIN_VALUE;
    private long input2Watermark = Long.MIN_VALUE;
    private long currentWatermark = Long.MIN_VALUE;
    private TypeSerializer<BaseRow> lTypeSerializer;
    private TypeSerializer<BaseRow> rTypeSerializer;
    private volatile transient boolean isInFinishingBundle = false;
    private AbstractRowSerializer<BaseRow> inputSer1;
    private AbstractRowSerializer<BaseRow> inputSer2;

    public KeyedCoBundleOperator(CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger, boolean finishBundleBeforeSnapshot) {
        this.finishBundleBeforeSnapshot = finishBundleBeforeSnapshot;
        Preconditions.checkNotNull(coBundleTrigger, "coBundleTrigger is null");
        this.coBundleTrigger = coBundleTrigger;
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> element) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        BaseRow key = (BaseRow)this.getCurrentKey();
        BaseRow row2 = this.inputSer1.copy((BaseRow)element.getValue());
        List records = this.leftBuffer.computeIfAbsent(key, k -> new ArrayList());
        records.add(row2);
        this.coBundleTrigger.onLeftElement((Object)row2);
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> element) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        BaseRow key = (BaseRow)this.getCurrentKey();
        BaseRow row2 = this.inputSer2.copy((BaseRow)element.getValue());
        List records = this.rightBuffer.computeIfAbsent(key, k -> new ArrayList());
        records.add(row2);
        this.coBundleTrigger.onRightElement((Object)row2);
        return TwoInputSelection.ANY;
    }

    public void processWatermark1(Watermark mark) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        this.input1Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.currentWatermark) {
            this.currentWatermark = newMin;
            this.finishBundle();
            this.output.emitWatermark(new Watermark(newMin));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        this.input2Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.currentWatermark) {
            this.currentWatermark = newMin;
            this.finishBundle();
            this.output.emitWatermark(new Watermark(newMin));
        }
    }

    public void endInput1() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        this.finishBundle();
    }

    public void endInput2() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        this.finishBundle();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (this.finishBundleBeforeSnapshot) {
            this.finishBundle();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finishBundle() throws Exception {
        Object object = this.checkpointingLock;
        synchronized (object) {
            while (this.isInFinishingBundle) {
                this.checkpointingLock.wait();
            }
            this.isInFinishingBundle = true;
            if (!this.leftBuffer.isEmpty() || !this.rightBuffer.isEmpty()) {
                this.processBundles(this.leftBuffer, this.rightBuffer, this.collector);
                this.leftBuffer.clear();
                this.rightBuffer.clear();
            }
            this.coBundleTrigger.reset();
            this.checkpointingLock.notifyAll();
            this.isInFinishingBundle = false;
        }
    }

    protected abstract void processBundles(Map<BaseRow, List<BaseRow>> var1, Map<BaseRow, List<BaseRow>> var2, Collector<BaseRow> var3) throws Exception;

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<BaseRow>> output2) {
        super.setup(containingTask, config, output2);
        this.checkpointingLock = this.getContainingTask().getCheckpointLock();
    }

    @VisibleForTesting
    public void setupTypeSerializer(TypeSerializer<BaseRow> lSerializer, TypeSerializer<BaseRow> rSerializer) {
        Objects.requireNonNull(lSerializer);
        Objects.requireNonNull(rSerializer);
        this.lTypeSerializer = lSerializer;
        this.rTypeSerializer = rSerializer;
    }

    public void open() throws Exception {
        super.open();
        this.checkpointingLock = this.getContainingTask().getCheckpointLock();
        this.collector = new StreamRecordCollector(this.output);
        this.leftBuffer = new HashMap<BaseRow, List<BaseRow>>();
        this.rightBuffer = new HashMap<BaseRow, List<BaseRow>>();
        this.inputSer1 = (AbstractRowSerializer)(this.lTypeSerializer == null ? this.config.getTypeSerializerIn1(this.getUserCodeClassloader()) : this.lTypeSerializer);
        this.inputSer2 = (AbstractRowSerializer)(this.rTypeSerializer == null ? this.config.getTypeSerializerIn2(this.getUserCodeClassloader()) : this.rTypeSerializer);
        if (!this.finishBundleBeforeSnapshot) {
            KeyedValueStateDescriptor leftBufferStateDesc = new KeyedValueStateDescriptor(LEFT_STATE_NAME, this.getKeySerializer(), new ListSerializer<BaseRow>(this.inputSer1));
            this.leftBufferState = (KeyedValueState)this.getKeyedState((KeyedStateDescriptor)leftBufferStateDesc);
            this.leftBuffer.putAll(this.leftBufferState.getAll());
            this.leftBufferState.removeAll();
            KeyedValueStateDescriptor rightBufferStateDesc = new KeyedValueStateDescriptor(RIGHT_STATE_NAME, this.getKeySerializer(), new ListSerializer<BaseRow>(this.inputSer2));
            this.rightBufferState = (KeyedValueState)this.getKeyedState((KeyedStateDescriptor)rightBufferStateDesc);
        }
        this.coBundleTrigger.registerBundleTriggerCallback((BundleTriggerCallback)this);
        this.coBundleTrigger.reset();
        LOG.info("KeyedCoBundleOperator's trigger info: " + this.coBundleTrigger.explain());
    }

    public void close() throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        try {
            this.finishBundle();
        }
        finally {
            Exception exception = null;
            try {
                super.close();
            }
            catch (InterruptedException interrupted) {
                exception = interrupted;
                Thread.currentThread().interrupt();
            }
            catch (Exception e2) {
                exception = e2;
            }
            if (exception != null) {
                LOG.warn("Errors occurred while closing the KeyedCoBundleOperator.", (Throwable)exception);
            }
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        while (this.isInFinishingBundle) {
            this.checkpointingLock.wait();
        }
        super.snapshotState(context);
        if (!this.finishBundleBeforeSnapshot) {
            this.leftBufferState.removeAll();
            this.rightBufferState.removeAll();
            this.leftBufferState.putAll(this.leftBuffer);
            this.rightBufferState.putAll(this.rightBuffer);
        }
    }

    public boolean requireState() {
        return true;
    }
}

