package org.apache.flink.table.runtime.join.stream;

import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.NonBatchOnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/join/stream/OuterJoinStreamOperator.class */
abstract class OuterJoinStreamOperator extends JoinStreamOperator {
    private static final long serialVersionUID = 1;
    protected final JoinMatchStateHandler.Type leftMatchStateType;
    protected final JoinMatchStateHandler.Type rightMatchStateType;
    protected transient JoinMatchStateHandler leftMatchStateHandler;
    protected transient JoinMatchStateHandler rightMatchStateHandler;
    protected transient BaseRow leftSideNullRow;
    protected transient BaseRow rightSideNullRow;

    public OuterJoinStreamOperator(BaseRowTypeInfo baseRowTypeInfo, BaseRowTypeInfo baseRowTypeInfo2, GeneratedJoinConditionFunction generatedJoinConditionFunction, KeySelector<BaseRow, BaseRow> keySelector, KeySelector<BaseRow, BaseRow> keySelector2, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, JoinStateHandler.Type type, JoinStateHandler.Type type2, long j, long j2, JoinMatchStateHandler.Type type3, JoinMatchStateHandler.Type type4, boolean[] zArr) {
        super(baseRowTypeInfo, baseRowTypeInfo2, generatedJoinConditionFunction, keySelector, keySelector2, generatedProjection, generatedProjection2, type, type2, j, j2, zArr);
        this.leftMatchStateType = type3;
        this.rightMatchStateType = type4;
    }

    @Override // org.apache.flink.table.runtime.join.stream.JoinStreamOperator
    public void open() throws Exception {
        super.open();
        this.leftSideNullRow = new GenericRow(this.leftType.getArity());
        this.rightSideNullRow = new GenericRow(this.rightType.getArity());
        LOG.info("leftJoinStateType {}, rightJoinStateType {}, leftMatchStateType {}, rightMatchStateType {}", new Object[]{this.leftJoinStateType, this.rightJoinStateType, this.leftMatchStateType, this.rightMatchStateType});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.runtime.join.stream.JoinStreamOperator
    public void initAllStates() throws Exception {
        super.initAllStates();
        this.leftMatchStateHandler = createMatchStateHandler(this.leftType, this.leftMatchStateType, this.leftKeyType, "LeftMatchHandler", this.leftPkProjectCode);
        this.rightMatchStateHandler = createMatchStateHandler(this.rightType, this.rightMatchStateType, this.rightKeyType, "RightMatchHandler", this.rightPkProjectCode);
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> internalTimer) throws Exception {
        if (((Byte) internalTimer.getNamespace()).byteValue() == 1) {
            if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.leftTimerState)) {
                this.leftMatchStateHandler.remove((BaseRow) internalTimer.getKey());
                this.leftStateHandler.remove((BaseRow) internalTimer.getKey());
                return;
            }
            return;
        }
        if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.rightTimerState)) {
            this.rightMatchStateHandler.remove((BaseRow) internalTimer.getKey());
            this.rightStateHandler.remove((BaseRow) internalTimer.getKey());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TwoInputSelection processElement(BaseRow baseRow, JoinStateHandler joinStateHandler, JoinStateHandler joinStateHandler2, JoinMatchStateHandler joinMatchStateHandler, JoinMatchStateHandler joinMatchStateHandler2, boolean z, boolean z2, boolean z3, KeyedValueState<BaseRow, Long> keyedValueState) throws Exception {
        long add;
        long currentProcessingTime = this.internalTimerService.currentProcessingTime();
        joinStateHandler.extractCurrentJoinKey(baseRow);
        joinStateHandler.extractCurrentPrimaryKey(baseRow);
        long otherSidePossibleMatchJoinCnt = getOtherSidePossibleMatchJoinCnt(joinMatchStateHandler2, joinStateHandler, joinStateHandler.getCurrentJoinKey());
        if (BaseRowUtil.isRetractMsg(baseRow)) {
            add = joinStateHandler.retract(baseRow);
        } else {
            registerProcessingCleanupTimer(joinStateHandler.getCurrentJoinKey(), currentProcessingTime, z, keyedValueState);
            add = joinStateHandler.add(baseRow, currentProcessingTime + this.maxRetentionTime);
        }
        byte header = baseRow.getHeader();
        this.joinedRow.setHeader(header);
        Iterator<Tuple3<BaseRow, Long, Long>> records = joinStateHandler2.getRecords(joinStateHandler.getCurrentJoinKey());
        long j = 0;
        while (records.hasNext()) {
            Tuple3<BaseRow, Long, Long> next = records.next();
            BaseRow baseRow2 = next.f0;
            long longValue = next.f1.longValue();
            boolean z4 = false;
            if (z) {
                if (applyCondition(baseRow, baseRow2, joinStateHandler.getCurrentJoinKey())) {
                    z4 = true;
                }
            } else if (applyCondition(baseRow2, baseRow, joinStateHandler.getCurrentJoinKey())) {
                z4 = true;
            }
            if (z4) {
                j += longValue;
                collectJoinResult(baseRow, joinStateHandler, baseRow2, longValue, z, z3, joinMatchStateHandler2, otherSidePossibleMatchJoinCnt);
            }
            if (next.f2.longValue() < currentProcessingTime && this.stateCleaningEnabled) {
                records.remove();
                joinMatchStateHandler2.remove(joinStateHandler.getCurrentJoinKey(), baseRow2);
            }
        }
        if (j == 0 && z2) {
            if (z) {
                collectResult(this.joinedRow.replace(baseRow, this.rightSideNullRow));
            } else {
                collectResult(this.joinedRow.replace(this.leftSideNullRow, baseRow));
            }
        }
        if (add <= 0) {
            baseRow.setHeader((byte) 0);
            joinMatchStateHandler.remove(joinStateHandler.getCurrentJoinKey(), baseRow);
            baseRow.setHeader(header);
        } else if (add == serialVersionUID && BaseRowUtil.isAccumulateMsg(baseRow)) {
            joinMatchStateHandler.updateRowMatchJoinCnt(joinStateHandler.getCurrentJoinKey(), baseRow, j);
        }
        return TwoInputSelection.ANY;
    }

    private void collectJoinResult(BaseRow baseRow, JoinStateHandler joinStateHandler, BaseRow baseRow2, long j, boolean z, boolean z2, JoinMatchStateHandler joinMatchStateHandler, long j2) {
        byte header = baseRow.getHeader();
        joinMatchStateHandler.extractCurrentRowMatchJoinCount(joinStateHandler.getCurrentJoinKey(), baseRow2, j2);
        if (z2 && joinMatchStateHandler.getCurrentRowMatchJoinCnt() == 0 && BaseRowUtil.isAccumulateMsg(baseRow)) {
            this.joinedRow.setHeader((byte) 1);
            for (int i = 0; i < j; i++) {
                if (z) {
                    collectResult(this.joinedRow.replace(this.leftSideNullRow, baseRow2));
                } else {
                    collectResult(this.joinedRow.replace(baseRow2, this.rightSideNullRow));
                }
            }
            this.joinedRow.setHeader(header);
        }
        joinMatchStateHandler.resetCurrentRowMatchJoinCnt(joinMatchStateHandler.getCurrentRowMatchJoinCnt() + (BaseRowUtil.isRetractMsg(baseRow) ? -1L : serialVersionUID));
        for (int i2 = 0; i2 < j; i2++) {
            if (z) {
                collectResult(this.joinedRow.replace(baseRow, baseRow2));
            } else {
                collectResult(this.joinedRow.replace(baseRow2, baseRow));
            }
        }
        if (z2 && joinMatchStateHandler.getCurrentRowMatchJoinCnt() == 0 && BaseRowUtil.isRetractMsg(baseRow)) {
            this.joinedRow.setHeader((byte) 0);
            for (int i3 = 0; i3 < j; i3++) {
                if (z) {
                    collectResult(this.joinedRow.replace(this.leftSideNullRow, baseRow2));
                } else {
                    collectResult(this.joinedRow.replace(baseRow2, this.rightSideNullRow));
                }
            }
            this.joinedRow.setHeader(header);
        }
    }

    private long getOtherSidePossibleMatchJoinCnt(JoinMatchStateHandler joinMatchStateHandler, JoinStateHandler joinStateHandler, BaseRow baseRow) {
        long j;
        if (!(joinMatchStateHandler instanceof NonBatchOnlyEqualityConditionMatchStateHandler)) {
            return 0L;
        }
        Iterator<Tuple3<BaseRow, Long, Long>> records = joinStateHandler.getRecords(baseRow);
        long j2 = 0;
        while (true) {
            j = j2;
            if (!records.hasNext() || j >= 2) {
                break;
            }
            j2 = j + records.next().f1.longValue();
        }
        return j;
    }

    private void collectResult(BaseRow baseRow) {
        this.collector.collect(baseRow);
    }
}
