/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.join.stream.bundle;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.join.stream.bundle.MiniBatchJoinStreamOperator;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;

@Internal
public class MiniBatchInnerJoinStreamOperator
extends MiniBatchJoinStreamOperator {
    private static final long serialVersionUID = 1L;

    public MiniBatchInnerJoinStreamOperator(BaseRowTypeInfo leftType, BaseRowTypeInfo rightType, GeneratedJoinConditionFunction condFuncCode, KeySelector<BaseRow, BaseRow> leftKeySelector, KeySelector<BaseRow, BaseRow> rightKeySelector, GeneratedProjection leftPkProjectCode, GeneratedProjection rightPkProjectCode, JoinStateHandler.Type leftJoinStateType, JoinStateHandler.Type rightJoinStateType, long maxRetentionTime, long minRetentionTime, Boolean leftIsAccRetract, Boolean rightIsAccRetract, boolean[] filterNullKeys, CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger, boolean finishBundleBeforeSnapshot) {
        super(leftType, rightType, condFuncCode, leftKeySelector, rightKeySelector, leftPkProjectCode, rightPkProjectCode, leftJoinStateType, rightJoinStateType, maxRetentionTime, minRetentionTime, leftIsAccRetract, rightIsAccRetract, filterNullKeys, coBundleTrigger, finishBundleBeforeSnapshot);
    }

    @Override
    public void open() throws Exception {
        super.open();
        LOG.info("Init MiniBatchInnerJoinStreamOperator");
        LOG.info("leftJoinStateType {}, rightJoinStateType {}", (Object)this.leftJoinStateType, (Object)this.rightJoinStateType);
    }

    public void processSingleSideBundles(Map<BaseRow, List<BaseRow>> inputSide, JoinStateHandler.Type inputSideJoinStateType, JoinStateHandler.Type otherSideJoinStateType, JoinStateHandler inputSideStateHandler, JoinStateHandler otherSideStateHandler, KeyedValueState<BaseRow, Long> timerState, Boolean inputSideIsLeft, Collector<BaseRow> out) throws Exception {
        if (otherSideJoinStateType == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            HashSet<BaseRow> keySet = new HashSet<BaseRow>();
            for (BaseRow stateKey : inputSide.keySet()) {
                keySet.add(stateKey);
            }
            otherSideStateHandler.batchGet(keySet);
        }
        HashMap<BaseRow, BaseRow> putMap = new HashMap<BaseRow, BaseRow>();
        HashSet<BaseRow> deleteSet = new HashSet<BaseRow>();
        Boolean isAccRetract = inputSideIsLeft != false ? this.leftIsAccRetract : this.rightIsAccRetract;
        for (Map.Entry<BaseRow, List<BaseRow>> entry : inputSide.entrySet()) {
            List<Tuple2<BaseRow, Long>> reducedList = this.reduceCurrentList((Iterable<BaseRow>)entry.getValue(), inputSideStateHandler, isAccRetract);
            this.joinCurrentList(entry.getKey(), reducedList, inputSideStateHandler, otherSideStateHandler, inputSideIsLeft, timerState, this.stateCleaningEnabled);
            if (inputSideJoinStateType != JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) continue;
            Tuple2<BaseRow, Long> lastTuple = null;
            Iterator<Tuple2<BaseRow, Long>> iterator = reducedList.iterator();
            while (iterator.hasNext()) {
                Tuple2<BaseRow, Long> tuple2;
                lastTuple = tuple2 = iterator.next();
            }
            if (lastTuple == null) continue;
            if ((Long)lastTuple.f1 < 0L) {
                deleteSet.add(entry.getKey());
                continue;
            }
            putMap.put(entry.getKey(), (BaseRow)lastTuple.f0);
        }
        if (inputSideJoinStateType == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            inputSideStateHandler.putAll(putMap);
            inputSideStateHandler.removeAll(deleteSet);
        }
    }

    @Override
    public void processBundles(Map<BaseRow, List<BaseRow>> left, Map<BaseRow, List<BaseRow>> right, Collector<BaseRow> out) throws Exception {
        this.processSingleSideBundles(right, this.rightJoinStateType, this.leftJoinStateType, this.rightStateHandler, this.leftStateHandler, (KeyedValueState<BaseRow, Long>)this.rightTimerState, false, out);
        this.processSingleSideBundles(left, this.leftJoinStateType, this.rightJoinStateType, this.leftStateHandler, this.rightStateHandler, (KeyedValueState<BaseRow, Long>)this.leftTimerState, true, out);
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> timer) throws Exception {
        byte namespace = (Byte)timer.getNamespace();
        if (namespace == 1) {
            if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.leftTimerState)) {
                this.leftStateHandler.remove((BaseRow)timer.getKey());
            }
        } else if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.rightTimerState)) {
            this.rightStateHandler.remove((BaseRow)timer.getKey());
        }
    }

    private void joinCurrentList(BaseRow currentJoinKey, List<Tuple2<BaseRow, Long>> inputList, JoinStateHandler inputStateHandler, JoinStateHandler otherStateHandler, boolean inputIsLeft, KeyedValueState<BaseRow, Long> timerState, boolean cleaningBasedTimer) throws Exception {
        inputStateHandler.setCurrentJoinKey(currentJoinKey);
        long currentTime2 = this.internalTimerService.currentProcessingTime();
        inputStateHandler.batchUpdate(currentJoinKey, inputList, currentTime2 + this.maxRetentionTime);
        this.registerProcessingCleanupTimer(inputStateHandler.getCurrentJoinKey(), currentTime2, inputIsLeft, timerState);
        Iterator<Tuple3<BaseRow, Long, Long>> iterator = null;
        iterator = otherStateHandler instanceof JoinKeyContainPrimaryKeyStateHandler ? ((JoinKeyContainPrimaryKeyStateHandler)otherStateHandler).getRecordsFromCache(currentJoinKey) : otherStateHandler.getRecords(currentJoinKey);
        while (iterator.hasNext()) {
            byte header;
            BaseRow input;
            Tuple3<BaseRow, Long, Long> tuple3 = iterator.next();
            BaseRow otherRow = (BaseRow)tuple3.f0;
            Long count = (Long)tuple3.f1;
            if (inputIsLeft) {
                for (Tuple2<BaseRow, Long> tuple2 : inputList) {
                    input = (BaseRow)tuple2.f0;
                    header = (Long)tuple2.f1 < 0L ? (byte)1 : 0;
                    this.joinedRow.setHeader(header);
                    if (!this.applyCondition(input, otherRow, inputStateHandler.getCurrentJoinKey())) continue;
                    this.collectResult(this.joinedRow.replace(input, otherRow), count * (Long)tuple2.f1);
                }
            } else {
                for (Tuple2<BaseRow, Long> tuple2 : inputList) {
                    input = (BaseRow)tuple2.f0;
                    header = (Long)tuple2.f1 < 0L ? (byte)1 : 0;
                    this.joinedRow.setHeader(header);
                    if (!this.applyCondition(otherRow, input, inputStateHandler.getCurrentJoinKey())) continue;
                    this.collectResult(this.joinedRow.replace(otherRow, input), count * (Long)tuple2.f1);
                }
            }
            if ((Long)tuple3.f2 > currentTime2 || !this.stateCleaningEnabled) continue;
            iterator.remove();
        }
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }
}

