package org.apache.flink.table.runtime.join.stream.bundle;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.OnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/join/stream/bundle/MiniBatchOuterJoinStreamOperator.class */
abstract class MiniBatchOuterJoinStreamOperator extends MiniBatchJoinStreamOperator {
    private static final long serialVersionUID = 1;
    protected final JoinMatchStateHandler.Type leftMatchStateType;
    protected final JoinMatchStateHandler.Type rightMatchStateType;
    protected transient JoinMatchStateHandler leftMatchStateHandler;
    protected transient JoinMatchStateHandler rightMatchStateHandler;
    protected transient BaseRow leftSideNullRow;
    protected transient BaseRow rightSideNullRow;

    public MiniBatchOuterJoinStreamOperator(BaseRowTypeInfo baseRowTypeInfo, BaseRowTypeInfo baseRowTypeInfo2, GeneratedJoinConditionFunction generatedJoinConditionFunction, KeySelector<BaseRow, BaseRow> keySelector, KeySelector<BaseRow, BaseRow> keySelector2, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, JoinStateHandler.Type type, JoinStateHandler.Type type2, long j, long j2, JoinMatchStateHandler.Type type3, JoinMatchStateHandler.Type type4, Boolean bool, Boolean bool2, boolean[] zArr, CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger, boolean z) {
        super(baseRowTypeInfo, baseRowTypeInfo2, generatedJoinConditionFunction, keySelector, keySelector2, generatedProjection, generatedProjection2, type, type2, j, j2, bool, bool2, zArr, coBundleTrigger, z);
        this.leftMatchStateType = type3;
        this.rightMatchStateType = type4;
    }

    @Override // org.apache.flink.table.runtime.join.stream.bundle.MiniBatchJoinStreamOperator, org.apache.flink.table.runtime.bundle.KeyedCoBundleOperator, org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator
    public void open() throws Exception {
        super.open();
        this.leftSideNullRow = new GenericRow(this.leftType.getArity());
        this.rightSideNullRow = new GenericRow(this.rightType.getArity());
        LOG.info("leftJoinStateType {}, rightJoinStateType {}, leftMatchStateType {}, rightMatchStateType {}", new Object[]{this.leftJoinStateType, this.rightJoinStateType, this.leftMatchStateType, this.rightMatchStateType});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.runtime.join.stream.bundle.MiniBatchJoinStreamOperator
    public void initAllStates() throws Exception {
        super.initAllStates();
        this.leftMatchStateHandler = createMatchStateHandler(this.leftType, this.leftMatchStateType, this.leftKeyType, "LeftMatchHandler", this.leftPkProjectCode);
        this.rightMatchStateHandler = createMatchStateHandler(this.rightType, this.rightMatchStateType, this.rightKeyType, "RightMatchHandler", this.rightPkProjectCode);
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> internalTimer) throws Exception {
        if (((Byte) internalTimer.getNamespace()).byteValue() == 1) {
            if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.leftTimerState)) {
                this.leftMatchStateHandler.remove((BaseRow) internalTimer.getKey());
                this.leftStateHandler.remove((BaseRow) internalTimer.getKey());
                return;
            }
            return;
        }
        if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.rightTimerState)) {
            this.rightMatchStateHandler.remove((BaseRow) internalTimer.getKey());
            this.rightStateHandler.remove((BaseRow) internalTimer.getKey());
        }
    }

    public void processSingleSideBundles(Map<BaseRow, List<BaseRow>> map, Map<BaseRow, List<BaseRow>> map2, JoinStateHandler.Type type, JoinStateHandler.Type type2, JoinStateHandler joinStateHandler, JoinStateHandler joinStateHandler2, JoinMatchStateHandler joinMatchStateHandler, JoinMatchStateHandler joinMatchStateHandler2, KeyedValueState<BaseRow, Long> keyedValueState, Boolean bool, boolean z, boolean z2, Collector<BaseRow> collector) throws Exception {
        if (type2 == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            HashSet hashSet = new HashSet();
            Iterator<BaseRow> it = map.keySet().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next());
            }
            joinStateHandler2.batchGet(hashSet);
        }
        HashMap hashMap = new HashMap();
        HashSet hashSet2 = new HashSet();
        Boolean valueOf = Boolean.valueOf(bool.booleanValue() ? this.leftIsAccRetract : this.rightIsAccRetract);
        for (Map.Entry<BaseRow, List<BaseRow>> entry : map.entrySet()) {
            List<Tuple2<BaseRow, Long>> reduceCurrentList = reduceCurrentList(entry.getValue(), joinStateHandler, valueOf);
            joinCurrentList(entry.getKey(), reduceCurrentList, joinStateHandler, joinStateHandler2, joinMatchStateHandler, joinMatchStateHandler2, bool.booleanValue(), z, z2, keyedValueState, this.stateCleaningEnabled);
            if (type == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
                Tuple2<BaseRow, Long> tuple2 = null;
                Iterator<Tuple2<BaseRow, Long>> it2 = reduceCurrentList.iterator();
                while (it2.hasNext()) {
                    tuple2 = it2.next();
                }
                if (tuple2 != null) {
                    if (tuple2.f1.longValue() < 0) {
                        hashSet2.add(entry.getKey());
                    } else {
                        hashMap.put(entry.getKey(), tuple2.f0);
                    }
                }
            }
        }
        if (type == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
            joinStateHandler.putAll(hashMap);
            joinStateHandler.removeAll(hashSet2);
        }
    }

    private void joinCurrentList(BaseRow baseRow, List<Tuple2<BaseRow, Long>> list, JoinStateHandler joinStateHandler, JoinStateHandler joinStateHandler2, JoinMatchStateHandler joinMatchStateHandler, JoinMatchStateHandler joinMatchStateHandler2, boolean z, boolean z2, boolean z3, KeyedValueState<BaseRow, Long> keyedValueState, boolean z4) throws Exception {
        joinStateHandler.setCurrentJoinKey(baseRow);
        long currentProcessingTime = this.internalTimerService.currentProcessingTime();
        registerProcessingCleanupTimer(joinStateHandler.getCurrentJoinKey(), currentProcessingTime, z, keyedValueState);
        Iterator<Tuple3<BaseRow, Long, Long>> recordsFromCache = joinStateHandler2 instanceof JoinKeyContainPrimaryKeyStateHandler ? ((JoinKeyContainPrimaryKeyStateHandler) joinStateHandler2).getRecordsFromCache(baseRow) : joinStateHandler2.getRecords(baseRow);
        long[] batchUpdate = joinStateHandler.batchUpdate(baseRow, list, currentProcessingTime + this.maxRetentionTime);
        long[] jArr = new long[list.size()];
        long j = 0;
        long j2 = 0;
        if (joinMatchStateHandler2 instanceof OnlyEqualityConditionMatchStateHandler) {
            joinMatchStateHandler2.extractCurrentRowMatchJoinCount(baseRow, null, 0L);
        }
        while (recordsFromCache.hasNext()) {
            Tuple3<BaseRow, Long, Long> next = recordsFromCache.next();
            BaseRow baseRow2 = next.f0;
            long longValue = next.f1.longValue();
            if (!(joinMatchStateHandler2 instanceof OnlyEqualityConditionMatchStateHandler)) {
                joinMatchStateHandler2.extractCurrentRowMatchJoinCount(baseRow, baseRow2, 0L);
            }
            j = joinMatchStateHandler2.getCurrentRowMatchJoinCnt();
            int i = 0;
            long j3 = 0;
            for (Tuple2<BaseRow, Long> tuple2 : list) {
                if (z) {
                    if (applyCondition(tuple2.f0, baseRow2, joinStateHandler.getCurrentJoinKey())) {
                        j3 += tuple2.f1.longValue();
                        int i2 = i;
                        jArr[i2] = jArr[i2] + longValue;
                    }
                } else if (applyCondition(baseRow2, tuple2.f0, joinStateHandler.getCurrentJoinKey())) {
                    j3 += tuple2.f1.longValue();
                    int i3 = i;
                    jArr[i3] = jArr[i3] + longValue;
                }
                i++;
            }
            j2 = j + j3;
            if (z3 && j == 0 && j2 > 0) {
                this.joinedRow.setHeader((byte) 1);
                if (z) {
                    collectResult(this.joinedRow.replace(this.leftSideNullRow, baseRow2), longValue);
                } else {
                    collectResult(this.joinedRow.replace(baseRow2, this.rightSideNullRow), longValue);
                }
            }
            for (Tuple2<BaseRow, Long> tuple22 : list) {
                this.joinedRow.setHeader(tuple22.f1.longValue() < 0 ? (byte) 1 : (byte) 0);
                if (z) {
                    if (applyCondition(tuple22.f0, baseRow2, joinStateHandler.getCurrentJoinKey())) {
                        collectResult(this.joinedRow.replace(tuple22.f0, baseRow2), tuple22.f1.longValue() * longValue);
                    }
                } else if (applyCondition(baseRow2, tuple22.f0, joinStateHandler.getCurrentJoinKey())) {
                    collectResult(this.joinedRow.replace(baseRow2, tuple22.f0), tuple22.f1.longValue() * longValue);
                }
            }
            if (z3 && j > 0 && j2 <= 0) {
                this.joinedRow.setHeader((byte) 0);
                if (z) {
                    collectResult(this.joinedRow.replace(this.leftSideNullRow, baseRow2), longValue);
                } else {
                    collectResult(this.joinedRow.replace(baseRow2, this.rightSideNullRow), longValue);
                }
            }
            if (!(joinMatchStateHandler2 instanceof OnlyEqualityConditionMatchStateHandler)) {
                joinMatchStateHandler2.updateRowMatchJoinCnt(baseRow, baseRow2, j2);
            }
            if (next.f2.longValue() < currentProcessingTime && this.stateCleaningEnabled) {
                recordsFromCache.remove();
                joinMatchStateHandler2.remove(baseRow, baseRow2);
            }
        }
        if ((joinMatchStateHandler2 instanceof OnlyEqualityConditionMatchStateHandler) && j2 != j) {
            joinMatchStateHandler2.updateRowMatchJoinCnt(baseRow, null, j2);
        }
        if (z2) {
            int i4 = 0;
            HashSet hashSet = new HashSet();
            HashMap hashMap = new HashMap();
            for (Tuple2<BaseRow, Long> tuple23 : list) {
                if (jArr[i4] == 0) {
                    this.joinedRow.setHeader(tuple23.f1.longValue() < 0 ? (byte) 1 : (byte) 0);
                    if (z) {
                        collectResult(this.joinedRow.replace(tuple23.f0, this.rightSideNullRow), tuple23.f1.longValue());
                    } else {
                        collectResult(this.joinedRow.replace(this.leftSideNullRow, tuple23.f0), tuple23.f1.longValue());
                    }
                }
                if (batchUpdate[i4] == -1) {
                    hashSet.add(tuple23.f0);
                } else if (batchUpdate[i4] == 1) {
                    hashMap.put(tuple23.f0, Long.valueOf(jArr[i4]));
                }
                i4++;
            }
            joinMatchStateHandler.removeAll(baseRow, hashSet);
            joinMatchStateHandler.addAll(baseRow, hashMap);
        }
    }
}
