/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.join.stream.bundle;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.runtime.join.stream.bundle.MiniBatchJoinStreamOperator;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.OnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;

@Internal
abstract class MiniBatchOuterJoinStreamOperator
extends MiniBatchJoinStreamOperator {
    private static final long serialVersionUID = 1L;
    protected final JoinMatchStateHandler.Type leftMatchStateType;
    protected final JoinMatchStateHandler.Type rightMatchStateType;
    protected transient JoinMatchStateHandler leftMatchStateHandler;
    protected transient JoinMatchStateHandler rightMatchStateHandler;
    protected transient BaseRow leftSideNullRow;
    protected transient BaseRow rightSideNullRow;

    public MiniBatchOuterJoinStreamOperator(BaseRowTypeInfo leftType, BaseRowTypeInfo rightType, GeneratedJoinConditionFunction condFuncCode, KeySelector<BaseRow, BaseRow> leftKeySelector, KeySelector<BaseRow, BaseRow> rightKeySelector, GeneratedProjection leftPkProjectCode, GeneratedProjection rightPkProjectCode, JoinStateHandler.Type leftJoinStateType, JoinStateHandler.Type rightJoinStateType, long maxRetentionTime, long minRetentionTime, JoinMatchStateHandler.Type leftMatchStateType, JoinMatchStateHandler.Type rightMatchStateType, Boolean leftIsAccRetract, Boolean rightIsAccRetract, boolean[] filterNullKeys, CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger, boolean finishBundleBeforeSnapshot) {
        super(leftType, rightType, condFuncCode, leftKeySelector, rightKeySelector, leftPkProjectCode, rightPkProjectCode, leftJoinStateType, rightJoinStateType, maxRetentionTime, minRetentionTime, leftIsAccRetract, rightIsAccRetract, filterNullKeys, coBundleTrigger, finishBundleBeforeSnapshot);
        this.leftMatchStateType = leftMatchStateType;
        this.rightMatchStateType = rightMatchStateType;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.leftSideNullRow = new GenericRow(this.leftType.getArity());
        this.rightSideNullRow = new GenericRow(this.rightType.getArity());
        LOG.info("leftJoinStateType {}, rightJoinStateType {}, leftMatchStateType {}, rightMatchStateType {}", new Object[]{this.leftJoinStateType, this.rightJoinStateType, this.leftMatchStateType, this.rightMatchStateType});
    }

    @Override
    protected void initAllStates() throws Exception {
        super.initAllStates();
        this.leftMatchStateHandler = this.createMatchStateHandler(this.leftType, this.leftMatchStateType, this.leftKeyType, "LeftMatchHandler", this.leftPkProjectCode);
        this.rightMatchStateHandler = this.createMatchStateHandler(this.rightType, this.rightMatchStateType, this.rightKeyType, "RightMatchHandler", this.rightPkProjectCode);
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> timer) throws Exception {
        byte namespace = (Byte)timer.getNamespace();
        if (namespace == 1) {
            if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.leftTimerState)) {
                this.leftMatchStateHandler.remove((BaseRow)timer.getKey());
                this.leftStateHandler.remove((BaseRow)timer.getKey());
            }
        } else if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.rightTimerState)) {
            this.rightMatchStateHandler.remove((BaseRow)timer.getKey());
            this.rightStateHandler.remove((BaseRow)timer.getKey());
        }
    }

    public void processSingleSideBundles(Map<BaseRow, List<BaseRow>> inputSide, Map<BaseRow, List<BaseRow>> otherSide, JoinStateHandler.Type inputSideJoinStateType, JoinStateHandler.Type otherSideJoinStateType, JoinStateHandler inputSideStateHandler, JoinStateHandler otherSideStateHandler, JoinMatchStateHandler inputSideMatchStateHandler, JoinMatchStateHandler otherSideMatchStateHandler, KeyedValueState<BaseRow, Long> timerState, Boolean inputSideIsLeft, boolean inputIsOuter, boolean otherSideIsOuter, Collector<BaseRow> out) throws Exception {
        if (otherSideJoinStateType == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            HashSet<BaseRow> keySet = new HashSet<BaseRow>();
            for (BaseRow stateKey : inputSide.keySet()) {
                keySet.add(stateKey);
            }
            otherSideStateHandler.batchGet(keySet);
        }
        HashMap<BaseRow, BaseRow> putMap = new HashMap<BaseRow, BaseRow>();
        HashSet<BaseRow> deleteSet = new HashSet<BaseRow>();
        Boolean isAccRetract = inputSideIsLeft != false ? this.leftIsAccRetract : this.rightIsAccRetract;
        for (Map.Entry<BaseRow, List<BaseRow>> entry : inputSide.entrySet()) {
            List<Tuple2<BaseRow, Long>> reducedList = this.reduceCurrentList((Iterable<BaseRow>)entry.getValue(), inputSideStateHandler, isAccRetract);
            this.joinCurrentList(entry.getKey(), reducedList, inputSideStateHandler, otherSideStateHandler, inputSideMatchStateHandler, otherSideMatchStateHandler, inputSideIsLeft, inputIsOuter, otherSideIsOuter, timerState, this.stateCleaningEnabled);
            if (inputSideJoinStateType != JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) continue;
            Tuple2<BaseRow, Long> lastTuple = null;
            Iterator<Tuple2<BaseRow, Long>> iterator = reducedList.iterator();
            while (iterator.hasNext()) {
                Tuple2<BaseRow, Long> tuple2;
                lastTuple = tuple2 = iterator.next();
            }
            if (lastTuple == null) continue;
            if ((Long)lastTuple.f1 < 0L) {
                deleteSet.add(entry.getKey());
                continue;
            }
            putMap.put(entry.getKey(), (BaseRow)lastTuple.f0);
        }
        if (inputSideJoinStateType == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            inputSideStateHandler.putAll(putMap);
            inputSideStateHandler.removeAll(deleteSet);
        }
    }

    private void joinCurrentList(BaseRow currentJoinKey, List<Tuple2<BaseRow, Long>> inputList, JoinStateHandler inputSideStateHandler, JoinStateHandler otherSideStateHandler, JoinMatchStateHandler inputSideMatchStateHandler, JoinMatchStateHandler otherSideMatchStateHandler, boolean inputIsLeft, boolean inputIsOuter, boolean otherSideIsOuter, KeyedValueState<BaseRow, Long> timerState, boolean cleaningBasedTimer) throws Exception {
        inputSideStateHandler.setCurrentJoinKey(currentJoinKey);
        long currentTime2 = this.internalTimerService.currentProcessingTime();
        this.registerProcessingCleanupTimer(inputSideStateHandler.getCurrentJoinKey(), currentTime2, inputIsLeft, timerState);
        Iterator<Tuple3<BaseRow, Long, Long>> iterator = null;
        iterator = otherSideStateHandler instanceof JoinKeyContainPrimaryKeyStateHandler ? ((JoinKeyContainPrimaryKeyStateHandler)otherSideStateHandler).getRecordsFromCache(currentJoinKey) : otherSideStateHandler.getRecords(currentJoinKey);
        long[] updateStatus = inputSideStateHandler.batchUpdate(currentJoinKey, inputList, currentTime2 + this.maxRetentionTime);
        long[] inputSideJoinCnt = new long[inputList.size()];
        long otherSideOldJoinCnt = 0L;
        long otherSideNewJoinCnt = 0L;
        if (otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler) {
            otherSideMatchStateHandler.extractCurrentRowMatchJoinCount(currentJoinKey, null, 0L);
        }
        while (iterator.hasNext()) {
            Tuple3<BaseRow, Long, Long> tuple3 = iterator.next();
            BaseRow matchRow = (BaseRow)tuple3.f0;
            long matchRowCount = (Long)tuple3.f1;
            if (!(otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler)) {
                otherSideMatchStateHandler.extractCurrentRowMatchJoinCount(currentJoinKey, matchRow, 0L);
            }
            otherSideOldJoinCnt = otherSideMatchStateHandler.getCurrentRowMatchJoinCnt();
            int idx = 0;
            long matchCnt = 0L;
            for (Tuple2<BaseRow, Long> tuple2 : inputList) {
                if (inputIsLeft) {
                    if (this.applyCondition((BaseRow)tuple2.f0, matchRow, inputSideStateHandler.getCurrentJoinKey())) {
                        matchCnt += ((Long)tuple2.f1).longValue();
                        int n = idx;
                        inputSideJoinCnt[n] = inputSideJoinCnt[n] + matchRowCount;
                    }
                } else if (this.applyCondition(matchRow, (BaseRow)tuple2.f0, inputSideStateHandler.getCurrentJoinKey())) {
                    matchCnt += ((Long)tuple2.f1).longValue();
                    int n = idx;
                    inputSideJoinCnt[n] = inputSideJoinCnt[n] + matchRowCount;
                }
                ++idx;
            }
            otherSideNewJoinCnt = otherSideOldJoinCnt + matchCnt;
            if (otherSideIsOuter && otherSideOldJoinCnt == 0L && otherSideNewJoinCnt > 0L) {
                this.joinedRow.setHeader((byte)1);
                if (inputIsLeft) {
                    this.collectResult(this.joinedRow.replace(this.leftSideNullRow, matchRow), matchRowCount);
                } else {
                    this.collectResult(this.joinedRow.replace(matchRow, this.rightSideNullRow), matchRowCount);
                }
            }
            for (Tuple2<BaseRow, Long> tuple2 : inputList) {
                this.joinedRow.setHeader((Long)tuple2.f1 < 0L ? (byte)1 : 0);
                if (inputIsLeft) {
                    if (!this.applyCondition((BaseRow)tuple2.f0, matchRow, inputSideStateHandler.getCurrentJoinKey())) continue;
                    this.collectResult(this.joinedRow.replace((BaseRow)tuple2.f0, matchRow), (Long)tuple2.f1 * matchRowCount);
                    continue;
                }
                if (!this.applyCondition(matchRow, (BaseRow)tuple2.f0, inputSideStateHandler.getCurrentJoinKey())) continue;
                this.collectResult(this.joinedRow.replace(matchRow, (BaseRow)tuple2.f0), (Long)tuple2.f1 * matchRowCount);
            }
            if (otherSideIsOuter && otherSideOldJoinCnt > 0L && otherSideNewJoinCnt <= 0L) {
                this.joinedRow.setHeader((byte)0);
                if (inputIsLeft) {
                    this.collectResult(this.joinedRow.replace(this.leftSideNullRow, matchRow), matchRowCount);
                } else {
                    this.collectResult(this.joinedRow.replace(matchRow, this.rightSideNullRow), matchRowCount);
                }
            }
            if (!(otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler)) {
                otherSideMatchStateHandler.updateRowMatchJoinCnt(currentJoinKey, matchRow, otherSideNewJoinCnt);
            }
            if ((Long)tuple3.f2 >= currentTime2 || !this.stateCleaningEnabled) continue;
            iterator.remove();
            otherSideMatchStateHandler.remove(currentJoinKey, matchRow);
        }
        if (otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler && otherSideNewJoinCnt != otherSideOldJoinCnt) {
            otherSideMatchStateHandler.updateRowMatchJoinCnt(currentJoinKey, null, otherSideNewJoinCnt);
        }
        if (inputIsOuter) {
            int idx = 0;
            HashSet<BaseRow> deleteSet = new HashSet<BaseRow>();
            HashMap<BaseRow, Long> addMap = new HashMap<BaseRow, Long>();
            for (Tuple2<BaseRow, Long> tuple2 : inputList) {
                if (inputSideJoinCnt[idx] == 0L) {
                    this.joinedRow.setHeader((Long)tuple2.f1 < 0L ? (byte)1 : 0);
                    if (inputIsLeft) {
                        this.collectResult(this.joinedRow.replace((BaseRow)tuple2.f0, this.rightSideNullRow), (Long)tuple2.f1);
                    } else {
                        this.collectResult(this.joinedRow.replace(this.leftSideNullRow, (BaseRow)tuple2.f0), (Long)tuple2.f1);
                    }
                }
                if (updateStatus[idx] == -1L) {
                    deleteSet.add((BaseRow)tuple2.f0);
                } else if (updateStatus[idx] == 1L) {
                    addMap.put((BaseRow)tuple2.f0, inputSideJoinCnt[idx]);
                }
                ++idx;
            }
            inputSideMatchStateHandler.removeAll(currentJoinKey, deleteSet);
            inputSideMatchStateHandler.addAll(currentJoinKey, addMap);
        }
    }
}

