package org.apache.flink.table.runtime.operator.join.stream.bundle;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state2.keyed.KeyedValueState;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.operator.join.stream.state.JoinKeyContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.operator.join.stream.state.JoinStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operator/join/stream/bundle/BatchInnerJoinStreamOperator.class */
public class BatchInnerJoinStreamOperator extends BatchJoinStreamOperator {
    private static final long serialVersionUID = 1;

    public BatchInnerJoinStreamOperator(BaseRowTypeInfo<BaseRow> baseRowTypeInfo, BaseRowTypeInfo<BaseRow> baseRowTypeInfo2, GeneratedJoinConditionFunction generatedJoinConditionFunction, KeySelector<BaseRow, BaseRow> keySelector, KeySelector<BaseRow, BaseRow> keySelector2, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, JoinStateHandler.Type type, JoinStateHandler.Type type2, long j, long j2, Boolean bool, Boolean bool2, boolean[] zArr, CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger, boolean z) {
        super(baseRowTypeInfo, baseRowTypeInfo2, generatedJoinConditionFunction, keySelector, keySelector2, generatedProjection, generatedProjection2, type, type2, j, j2, bool, bool2, zArr, coBundleTrigger, z);
    }

    @Override // org.apache.flink.table.runtime.operator.join.stream.bundle.BatchJoinStreamOperator, org.apache.flink.table.runtime.operator.bundle.KeyedCoBundleOperator
    public void open() throws Exception {
        super.open();
        LOG.info("Init BatchInnerJoinStreamOperator");
        LOG.info("leftJoinStateType {}, rightJoinStateType {}", this.leftJoinStateType, this.rightJoinStateType);
        this.leftPkProjectCode = null;
        this.rightPkProjectCode = null;
    }

    public void processSingleSideBundles(Map<BaseRow, List<BaseRow>> map, JoinStateHandler.Type type, JoinStateHandler.Type type2, JoinStateHandler joinStateHandler, JoinStateHandler joinStateHandler2, KeyedValueState<BaseRow, Long> keyedValueState, Boolean bool, Collector<BaseRow> collector) throws Exception {
        if (type2 == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            HashSet hashSet = new HashSet();
            Iterator<BaseRow> it = map.keySet().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next());
            }
            joinStateHandler2.batchGet(hashSet);
        }
        HashMap hashMap = new HashMap();
        Set<BaseRow> hashSet2 = new HashSet<>();
        Boolean valueOf = Boolean.valueOf(bool.booleanValue() ? this.leftIsAccRetract : this.rightIsAccRetract);
        for (Map.Entry<BaseRow, List<BaseRow>> entry : map.entrySet()) {
            List<Tuple2<BaseRow, Long>> reduceCurrentList = reduceCurrentList(entry.getValue(), joinStateHandler, valueOf);
            joinCurrentList(entry.getKey(), reduceCurrentList, joinStateHandler, joinStateHandler2, bool.booleanValue(), keyedValueState, this.stateCleaningEnabled);
            if (type == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
                Tuple2<BaseRow, Long> tuple2 = null;
                Iterator<Tuple2<BaseRow, Long>> it2 = reduceCurrentList.iterator();
                while (it2.hasNext()) {
                    tuple2 = it2.next();
                }
                if (tuple2 != null) {
                    if (((Long) tuple2.f1).longValue() < 0) {
                        hashSet2.add(entry.getKey());
                    } else {
                        hashMap.put(entry.getKey(), tuple2.f0);
                    }
                }
            }
        }
        if (type == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            joinStateHandler.putAll(hashMap);
            joinStateHandler.removeAll(hashSet2);
        }
    }

    @Override // org.apache.flink.table.runtime.operator.bundle.KeyedCoBundleOperator
    public void processBundles(Map<BaseRow, List<BaseRow>> map, Map<BaseRow, List<BaseRow>> map2, Collector<BaseRow> collector) throws Exception {
        processSingleSideBundles(map2, this.rightJoinStateType, this.leftJoinStateType, this.rightStateHandler, this.leftStateHandler, this.rightTimerState, false, collector);
        processSingleSideBundles(map, this.leftJoinStateType, this.rightJoinStateType, this.leftStateHandler, this.rightStateHandler, this.leftTimerState, true, collector);
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> internalTimer) throws Exception {
        if (((Byte) internalTimer.getNamespace()).byteValue() == 1) {
            if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.leftTimerState)) {
                this.leftStateHandler.remove((BaseRow) internalTimer.getKey());
            }
        } else if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.rightTimerState)) {
            this.rightStateHandler.remove((BaseRow) internalTimer.getKey());
        }
    }

    private void joinCurrentList(BaseRow baseRow, List<Tuple2<BaseRow, Long>> list, JoinStateHandler joinStateHandler, JoinStateHandler joinStateHandler2, boolean z, KeyedValueState<BaseRow, Long> keyedValueState, boolean z2) throws Exception {
        joinStateHandler.setCurrentJoinKey(baseRow);
        long currentProcessingTime = this.internalTimerService.currentProcessingTime();
        joinStateHandler.batchUpdate(baseRow, list, currentProcessingTime + this.maxRetentionTime);
        registerProcessingCleanupTimer(joinStateHandler.getCurrentJoinKey(), currentProcessingTime, z, keyedValueState);
        Iterator<Tuple3<BaseRow, Long, Long>> recordsFromCache = joinStateHandler2 instanceof JoinKeyContainPrimaryKeyStateHandler ? ((JoinKeyContainPrimaryKeyStateHandler) joinStateHandler2).getRecordsFromCache(baseRow) : joinStateHandler2.getRecords(baseRow);
        while (recordsFromCache.hasNext()) {
            Tuple3<BaseRow, Long, Long> next = recordsFromCache.next();
            BaseRow baseRow2 = (BaseRow) next.f0;
            Long l = (Long) next.f1;
            if (z) {
                for (Tuple2<BaseRow, Long> tuple2 : list) {
                    BaseRow baseRow3 = (BaseRow) tuple2.f0;
                    this.joinedRow.setHeader(((Long) tuple2.f1).longValue() < 0 ? (byte) 1 : (byte) 0);
                    if (applyCondition(baseRow3, baseRow2, joinStateHandler.getCurrentJoinKey())) {
                        collectResult(this.joinedRow.replace(baseRow3, baseRow2), l.longValue() * ((Long) tuple2.f1).longValue());
                    }
                }
            } else {
                for (Tuple2<BaseRow, Long> tuple22 : list) {
                    BaseRow baseRow4 = (BaseRow) tuple22.f0;
                    this.joinedRow.setHeader(((Long) tuple22.f1).longValue() < 0 ? (byte) 1 : (byte) 0);
                    if (applyCondition(baseRow2, baseRow4, joinStateHandler.getCurrentJoinKey())) {
                        collectResult(this.joinedRow.replace(baseRow2, baseRow4), l.longValue() * ((Long) tuple22.f1).longValue());
                    }
                }
            }
            if (((Long) next.f2).longValue() <= currentProcessingTime && this.stateCleaningEnabled) {
                recordsFromCache.remove();
            }
        }
    }
}
