package org.apache.flink.table.runtime.operator.join.stream;

import java.util.Iterator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.operators.InputElementSelection;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.operator.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.operator.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.operator.join.stream.state.match.NonBatchOnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.util.BaseRowUtil;

/* loaded from: input_file:org/apache/flink/table/runtime/operator/join/stream/SemiAntiJoinStreamOperator.class */
public class SemiAntiJoinStreamOperator extends JoinStreamOperator {
    private boolean isAntiJoin;
    private final JoinMatchStateHandler.Type leftMatchStateType;
    private final boolean careLeftMatchTimes;
    protected final JoinStateHandler.Type leftEmitedJoinStateType;
    protected final JoinStateHandler.Type leftNotEmitedJoinStateType;
    private transient JoinStateHandler leftEmitedStateHandler;
    private transient JoinStateHandler leftNotEmitedStateHandler;
    private transient JoinMatchStateHandler leftMatchStateHandler;
    private boolean rightNotEmitRetraction;

    public SemiAntiJoinStreamOperator(BaseRowTypeInfo<BaseRow> baseRowTypeInfo, BaseRowTypeInfo<BaseRow> baseRowTypeInfo2, GeneratedJoinConditionFunction generatedJoinConditionFunction, KeySelector<BaseRow, BaseRow> keySelector, KeySelector<BaseRow, BaseRow> keySelector2, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, JoinStateHandler.Type type, JoinStateHandler.Type type2, long j, long j2, boolean z, JoinMatchStateHandler.Type type3, boolean z2, boolean[] zArr) {
        super(baseRowTypeInfo, baseRowTypeInfo2, generatedJoinConditionFunction, keySelector, keySelector2, generatedProjection, generatedProjection2, type, type2, j, j2, zArr);
        this.isAntiJoin = z;
        this.rightNotEmitRetraction = z2;
        if (z) {
            if (z2) {
                this.leftNotEmitedJoinStateType = JoinStateHandler.Type.EMPTY;
            } else {
                this.leftNotEmitedJoinStateType = type;
            }
            this.leftEmitedJoinStateType = type;
        } else {
            if (z2) {
                this.leftEmitedJoinStateType = JoinStateHandler.Type.EMPTY;
            } else {
                this.leftEmitedJoinStateType = type;
            }
            this.leftNotEmitedJoinStateType = type;
        }
        this.leftMatchStateType = type3;
        if (type3.equals(JoinMatchStateHandler.Type.EMPTY_MATCH) || type3.equals(JoinMatchStateHandler.Type.ONLY_EQUALITY_CONDITION_EMPTY_MATCH)) {
            this.careLeftMatchTimes = false;
        } else {
            this.careLeftMatchTimes = true;
        }
    }

    @Override // org.apache.flink.table.runtime.operator.join.stream.JoinStreamOperator
    public void open() throws Exception {
        super.open();
        LOG.info("leftNotEmitedJoinStateType {}, leftEmitedJoinStateType {}, leftMatchStateType {}, rightJoinStateType {}, rightNotEmitRetraction {}", new Object[]{this.leftNotEmitedJoinStateType, this.leftEmitedJoinStateType, this.leftMatchStateType, Boolean.valueOf(this.rightNotEmitRetraction)});
        LOG.info("Init SemiAntiJoinStreamOperator.");
        this.leftPkProjectCode = null;
        this.rightPkProjectCode = null;
    }

    @Override // org.apache.flink.table.runtime.operator.join.stream.JoinStreamOperator
    protected void initAllStates() throws Exception {
        this.leftEmitedStateHandler = createJoinStateHandler(this.leftType, this.leftEmitedJoinStateType, "leftEmitedState", this.leftKeySelector, this.leftKeyType, this.leftPkProjectCode);
        this.leftNotEmitedStateHandler = createJoinStateHandler(this.leftType, this.leftNotEmitedJoinStateType, "leftNotEmitedState", this.leftKeySelector, this.leftKeyType, this.leftPkProjectCode);
        this.leftMatchStateHandler = createMatchStateHandler(this.leftType, this.leftMatchStateType, this.leftKeyType, "LeftMatchHandler", this.leftPkProjectCode);
        this.rightStateHandler = createJoinStateHandler(this.rightType, this.rightJoinStateType, "rightJoinState", this.rightKeySelector, this.rightKeyType, this.rightPkProjectCode);
        if (this.stateCleaningEnabled) {
            this.leftTimerState = createCleanupTimeState("left-time-state");
            this.rightTimerState = createCleanupTimeState("right-time-state");
        }
    }

    public InputElementSelection processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        long currentProcessingTime = this.internalTimerService.currentProcessingTime();
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        BaseRow baseRow2 = (BaseRow) this.leftKeySelector.getKey(baseRow);
        if (BaseRowUtil.isAccumulateMsg(baseRow)) {
            registerProcessingCleanupTimer(baseRow2, currentProcessingTime, true, this.leftTimerState);
        }
        boolean z = this.isAntiJoin;
        if (this.leftEmitedStateHandler.contains(baseRow2, baseRow)) {
            z = true;
        } else {
            Iterator<Tuple3<BaseRow, Long, Long>> records = this.rightStateHandler.getRecords(baseRow2);
            long j = 0;
            while (records.hasNext()) {
                Tuple3<BaseRow, Long, Long> next = records.next();
                BaseRow baseRow3 = (BaseRow) next.f0;
                if (((Long) next.f2).longValue() < currentProcessingTime && this.stateCleaningEnabled) {
                    records.remove();
                } else if (applyCondition(baseRow, baseRow3, (BaseRow) this.leftKeySelector.getKey(baseRow))) {
                    j += ((Long) next.f1).longValue();
                    z = !this.isAntiJoin;
                    if (!this.careLeftMatchTimes) {
                        break;
                    }
                } else {
                    continue;
                }
            }
            if (this.careLeftMatchTimes && j > 0 && BaseRowUtil.isAccumulateMsg(baseRow)) {
                this.leftMatchStateHandler.updateRowMatchJoinCnt(baseRow2, baseRow, j);
            }
        }
        if (z) {
            this.collector.collect(baseRow);
            this.leftEmitedStateHandler.extractCurrentJoinKey(baseRow);
            this.leftEmitedStateHandler.extractCurrentPrimaryKey(baseRow);
            if (BaseRowUtil.isRetractMsg(baseRow)) {
                this.leftEmitedStateHandler.retract(baseRow);
            } else {
                this.leftEmitedStateHandler.add(baseRow, currentProcessingTime + this.maxRetentionTime);
            }
        } else {
            this.leftNotEmitedStateHandler.extractCurrentJoinKey(baseRow);
            this.leftNotEmitedStateHandler.extractCurrentPrimaryKey(baseRow);
            if (BaseRowUtil.isRetractMsg(baseRow)) {
                this.leftNotEmitedStateHandler.retract(baseRow);
            } else {
                this.leftNotEmitedStateHandler.add(baseRow, currentProcessingTime + this.maxRetentionTime);
            }
        }
        return InputElementSelection.ANY;
    }

    public InputElementSelection processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        if (this.isAntiJoin) {
            processReceivedRightRow(baseRow, this.leftNotEmitedStateHandler, this.leftEmitedStateHandler);
        } else {
            processReceivedRightRow(baseRow, this.leftEmitedStateHandler, this.leftNotEmitedStateHandler);
        }
        return InputElementSelection.ANY;
    }

    private void processReceivedRightRow(BaseRow baseRow, JoinStateHandler joinStateHandler, JoinStateHandler joinStateHandler2) throws Exception {
        long currentProcessingTime = this.internalTimerService.currentProcessingTime();
        this.rightStateHandler.extractCurrentJoinKey(baseRow);
        this.rightStateHandler.extractCurrentPrimaryKey(baseRow);
        BaseRow currentJoinKey = this.rightStateHandler.getCurrentJoinKey();
        if (BaseRowUtil.isRetractMsg(baseRow)) {
            long otherSidePossibleMatchJoinCnt = getOtherSidePossibleMatchJoinCnt(this.leftMatchStateHandler, this.rightStateHandler, currentJoinKey);
            Iterator<Tuple3<BaseRow, Long, Long>> records = joinStateHandler.getRecords(currentJoinKey);
            while (records.hasNext()) {
                Tuple3<BaseRow, Long, Long> next = records.next();
                BaseRow baseRow2 = (BaseRow) next.f0;
                if (((Long) next.f2).longValue() < currentProcessingTime && this.stateCleaningEnabled) {
                    records.remove();
                    this.leftMatchStateHandler.remove(currentJoinKey, baseRow2);
                } else if (applyCondition(baseRow2, baseRow, this.rightStateHandler.getCurrentJoinKey())) {
                    this.leftMatchStateHandler.extractCurrentRowMatchJoinCount(currentJoinKey, baseRow2, otherSidePossibleMatchJoinCnt);
                    long currentRowMatchJoinCnt = this.leftMatchStateHandler.getCurrentRowMatchJoinCnt() - 1;
                    if (currentRowMatchJoinCnt == 0) {
                        this.leftMatchStateHandler.remove(currentJoinKey, baseRow2);
                        if (!this.isAntiJoin) {
                            baseRow2.setHeader((byte) 1);
                        }
                        for (int i = 0; i < ((Long) next.f1).longValue(); i++) {
                            this.collector.collect(baseRow2);
                        }
                        baseRow2.setHeader((byte) 0);
                        joinStateHandler2.update(currentJoinKey, baseRow2, ((Long) next.f1).longValue(), ((Long) next.f2).longValue());
                        records.remove();
                    } else {
                        this.leftMatchStateHandler.resetCurrentRowMatchJoinCnt(currentRowMatchJoinCnt);
                    }
                }
            }
            this.rightStateHandler.retract(baseRow);
            return;
        }
        registerProcessingCleanupTimer(currentJoinKey, currentProcessingTime, false, this.rightTimerState);
        Iterator<Tuple3<BaseRow, Long, Long>> records2 = joinStateHandler.getRecords(currentJoinKey);
        while (records2.hasNext()) {
            Tuple3<BaseRow, Long, Long> next2 = records2.next();
            BaseRow baseRow3 = (BaseRow) next2.f0;
            if (((Long) next2.f2).longValue() < currentProcessingTime && this.stateCleaningEnabled) {
                records2.remove();
                this.leftMatchStateHandler.remove(currentJoinKey, baseRow3);
            } else if (applyCondition(baseRow3, baseRow, this.rightStateHandler.getCurrentJoinKey())) {
                this.leftMatchStateHandler.addRowMatchJoinCnt(currentJoinKey, baseRow3, 1L);
            }
        }
        Iterator<Tuple3<BaseRow, Long, Long>> records3 = joinStateHandler2.getRecords(currentJoinKey);
        while (records3.hasNext()) {
            Tuple3<BaseRow, Long, Long> next3 = records3.next();
            BaseRow baseRow4 = (BaseRow) next3.f0;
            if (((Long) next3.f2).longValue() < currentProcessingTime && this.stateCleaningEnabled) {
                records2.remove();
                this.leftMatchStateHandler.remove(currentJoinKey, baseRow4);
            } else if (applyCondition(baseRow4, baseRow, this.rightStateHandler.getCurrentJoinKey())) {
                if (this.isAntiJoin) {
                    baseRow4.setHeader((byte) 1);
                }
                for (int i2 = 0; i2 < ((Long) next3.f1).longValue(); i2++) {
                    this.collector.collect(baseRow4);
                }
                baseRow4.setHeader((byte) 0);
                this.leftMatchStateHandler.updateRowMatchJoinCnt(currentJoinKey, baseRow4, 1L);
                joinStateHandler.update(currentJoinKey, baseRow4, ((Long) next3.f1).longValue(), ((Long) next3.f2).longValue());
                records3.remove();
            }
        }
        this.rightStateHandler.add(baseRow, currentProcessingTime + this.maxRetentionTime);
    }

    private long getOtherSidePossibleMatchJoinCnt(JoinMatchStateHandler joinMatchStateHandler, JoinStateHandler joinStateHandler, BaseRow baseRow) {
        long j;
        if (!(joinMatchStateHandler instanceof NonBatchOnlyEqualityConditionMatchStateHandler)) {
            return 0L;
        }
        Iterator<Tuple3<BaseRow, Long, Long>> records = joinStateHandler.getRecords(baseRow);
        long j2 = 0;
        while (true) {
            j = j2;
            if (!records.hasNext() || j >= 2) {
                break;
            }
            j2 = j + ((Long) records.next().f1).longValue();
        }
        return j;
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> internalTimer) throws Exception {
        if (((Byte) internalTimer.getNamespace()).byteValue() != 1) {
            if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.rightTimerState)) {
                this.rightStateHandler.remove((BaseRow) internalTimer.getKey());
            }
        } else if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.leftTimerState)) {
            this.leftEmitedStateHandler.remove((BaseRow) internalTimer.getKey());
            this.leftNotEmitedStateHandler.remove((BaseRow) internalTimer.getKey());
            this.leftMatchStateHandler.remove((BaseRow) internalTimer.getKey());
        }
    }
}
