/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.join.stream;

import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.join.stream.JoinStreamOperator;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.NonBatchOnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

@Internal
abstract class OuterJoinStreamOperator
extends JoinStreamOperator {
    private static final long serialVersionUID = 1L;
    protected final JoinMatchStateHandler.Type leftMatchStateType;
    protected final JoinMatchStateHandler.Type rightMatchStateType;
    protected transient JoinMatchStateHandler leftMatchStateHandler;
    protected transient JoinMatchStateHandler rightMatchStateHandler;
    protected transient BaseRow leftSideNullRow;
    protected transient BaseRow rightSideNullRow;

    public OuterJoinStreamOperator(BaseRowTypeInfo leftType, BaseRowTypeInfo rightType, GeneratedJoinConditionFunction condFuncCode, KeySelector<BaseRow, BaseRow> leftKeySelector, KeySelector<BaseRow, BaseRow> rightKeySelector, GeneratedProjection leftPkProjectCode, GeneratedProjection rightPkProjectCode, JoinStateHandler.Type leftJoinStateType, JoinStateHandler.Type rightJoinStateType, long maxRetentionTime, long minRetentionTime, JoinMatchStateHandler.Type leftMatchStateType, JoinMatchStateHandler.Type rightMatchStateType, boolean[] filterNullKeys) {
        super(leftType, rightType, condFuncCode, leftKeySelector, rightKeySelector, leftPkProjectCode, rightPkProjectCode, leftJoinStateType, rightJoinStateType, maxRetentionTime, minRetentionTime, filterNullKeys);
        this.leftMatchStateType = leftMatchStateType;
        this.rightMatchStateType = rightMatchStateType;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.leftSideNullRow = new GenericRow(this.leftType.getArity());
        this.rightSideNullRow = new GenericRow(this.rightType.getArity());
        LOG.info("leftJoinStateType {}, rightJoinStateType {}, leftMatchStateType {}, rightMatchStateType {}", new Object[]{this.leftJoinStateType, this.rightJoinStateType, this.leftMatchStateType, this.rightMatchStateType});
    }

    @Override
    protected void initAllStates() throws Exception {
        super.initAllStates();
        this.leftMatchStateHandler = this.createMatchStateHandler(this.leftType, this.leftMatchStateType, this.leftKeyType, "LeftMatchHandler", this.leftPkProjectCode);
        this.rightMatchStateHandler = this.createMatchStateHandler(this.rightType, this.rightMatchStateType, this.rightKeyType, "RightMatchHandler", this.rightPkProjectCode);
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> timer) throws Exception {
        byte namespace = (Byte)timer.getNamespace();
        if (namespace == 1) {
            if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.leftTimerState)) {
                this.leftMatchStateHandler.remove((BaseRow)timer.getKey());
                this.leftStateHandler.remove((BaseRow)timer.getKey());
            }
        } else if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.rightTimerState)) {
            this.rightMatchStateHandler.remove((BaseRow)timer.getKey());
            this.rightStateHandler.remove((BaseRow)timer.getKey());
        }
    }

    protected TwoInputSelection processElement(BaseRow input, JoinStateHandler inputSideStateHandler, JoinStateHandler otherSideStateHandler, JoinMatchStateHandler inputSideMatchStateHandler, JoinMatchStateHandler otherSideMatchStateHandler, boolean inputIsLeft, boolean inputIsOuter, boolean otherSideIsOuter, KeyedValueState<BaseRow, Long> timerState) throws Exception {
        long inputCount;
        long currentTime2 = this.internalTimerService.currentProcessingTime();
        inputSideStateHandler.extractCurrentJoinKey(input);
        inputSideStateHandler.extractCurrentPrimaryKey(input);
        long possibleJoinCnt = this.getOtherSidePossibleMatchJoinCnt(otherSideMatchStateHandler, inputSideStateHandler, inputSideStateHandler.getCurrentJoinKey());
        if (BaseRowUtil.isRetractMsg(input)) {
            inputCount = inputSideStateHandler.retract(input);
        } else {
            this.registerProcessingCleanupTimer(inputSideStateHandler.getCurrentJoinKey(), currentTime2, inputIsLeft, timerState);
            inputCount = inputSideStateHandler.add(input, currentTime2 + this.maxRetentionTime);
        }
        byte reservedHeader = input.getHeader();
        this.joinedRow.setHeader(reservedHeader);
        Iterator<Tuple3<BaseRow, Long, Long>> iterator = otherSideStateHandler.getRecords(inputSideStateHandler.getCurrentJoinKey());
        long inputJoinOtherSideRowNum = 0L;
        while (iterator.hasNext()) {
            Tuple3<BaseRow, Long, Long> tuple3 = iterator.next();
            BaseRow matchRow = (BaseRow)tuple3.f0;
            long matchRowCount = (Long)tuple3.f1;
            boolean isMatched = false;
            if (inputIsLeft) {
                if (this.applyCondition(input, matchRow, inputSideStateHandler.getCurrentJoinKey())) {
                    isMatched = true;
                }
            } else if (this.applyCondition(matchRow, input, inputSideStateHandler.getCurrentJoinKey())) {
                isMatched = true;
            }
            if (isMatched) {
                inputJoinOtherSideRowNum += matchRowCount;
                this.collectJoinResult(input, inputSideStateHandler, matchRow, matchRowCount, inputIsLeft, otherSideIsOuter, otherSideMatchStateHandler, possibleJoinCnt);
            }
            if ((Long)tuple3.f2 >= currentTime2 || !this.stateCleaningEnabled) continue;
            iterator.remove();
            otherSideMatchStateHandler.remove(inputSideStateHandler.getCurrentJoinKey(), matchRow);
        }
        if (inputJoinOtherSideRowNum == 0L && inputIsOuter) {
            if (inputIsLeft) {
                this.collectResult(this.joinedRow.replace(input, this.rightSideNullRow));
            } else {
                this.collectResult(this.joinedRow.replace(this.leftSideNullRow, input));
            }
        }
        if (inputCount <= 0L) {
            input.setHeader((byte)0);
            inputSideMatchStateHandler.remove(inputSideStateHandler.getCurrentJoinKey(), input);
            input.setHeader(reservedHeader);
        } else if (inputCount == 1L && BaseRowUtil.isAccumulateMsg(input)) {
            inputSideMatchStateHandler.updateRowMatchJoinCnt(inputSideStateHandler.getCurrentJoinKey(), input, inputJoinOtherSideRowNum);
        }
        return TwoInputSelection.ANY;
    }

    private void collectJoinResult(BaseRow input, JoinStateHandler inputSideStateHandler, BaseRow matchedRow, long matchedRowCount, boolean inputIsLeft, boolean matchSideIsOuter, JoinMatchStateHandler matchSideMatchStateHandler, long possibleJoinCnt) {
        byte reservedHeader = input.getHeader();
        BaseRow currentJoinKey = inputSideStateHandler.getCurrentJoinKey();
        matchSideMatchStateHandler.extractCurrentRowMatchJoinCount(currentJoinKey, matchedRow, possibleJoinCnt);
        if (matchSideIsOuter && matchSideMatchStateHandler.getCurrentRowMatchJoinCnt() == 0L && BaseRowUtil.isAccumulateMsg(input)) {
            this.joinedRow.setHeader((byte)1);
            int i = 0;
            while ((long)i < matchedRowCount) {
                if (inputIsLeft) {
                    this.collectResult(this.joinedRow.replace(this.leftSideNullRow, matchedRow));
                } else {
                    this.collectResult(this.joinedRow.replace(matchedRow, this.rightSideNullRow));
                }
                ++i;
            }
            this.joinedRow.setHeader(reservedHeader);
        }
        long appendJoinCnt = BaseRowUtil.isRetractMsg(input) ? -1L : 1L;
        long updateJoinCnt = matchSideMatchStateHandler.getCurrentRowMatchJoinCnt() + appendJoinCnt;
        matchSideMatchStateHandler.resetCurrentRowMatchJoinCnt(updateJoinCnt);
        int i = 0;
        while ((long)i < matchedRowCount) {
            if (inputIsLeft) {
                this.collectResult(this.joinedRow.replace(input, matchedRow));
            } else {
                this.collectResult(this.joinedRow.replace(matchedRow, input));
            }
            ++i;
        }
        if (matchSideIsOuter && matchSideMatchStateHandler.getCurrentRowMatchJoinCnt() == 0L && BaseRowUtil.isRetractMsg(input)) {
            this.joinedRow.setHeader((byte)0);
            i = 0;
            while ((long)i < matchedRowCount) {
                if (inputIsLeft) {
                    this.collectResult(this.joinedRow.replace(this.leftSideNullRow, matchedRow));
                } else {
                    this.collectResult(this.joinedRow.replace(matchedRow, this.rightSideNullRow));
                }
                ++i;
            }
            this.joinedRow.setHeader(reservedHeader);
        }
    }

    private long getOtherSidePossibleMatchJoinCnt(JoinMatchStateHandler otherSideStateHandler, JoinStateHandler inputSideStateHandler, BaseRow joinKey) {
        if (otherSideStateHandler instanceof NonBatchOnlyEqualityConditionMatchStateHandler) {
            long matchJoinCount;
            Tuple3<BaseRow, Long, Long> tuple3;
            Iterator<Tuple3<BaseRow, Long, Long>> iterator = inputSideStateHandler.getRecords(joinKey);
            for (matchJoinCount = 0L; iterator.hasNext() && matchJoinCount < 2L; matchJoinCount += ((Long)tuple3.f1).longValue()) {
                tuple3 = iterator.next();
            }
            return matchJoinCount;
        }
        return 0L;
    }

    private void collectResult(BaseRow row2) {
        this.collector.collect((Object)row2);
    }
}

