package org.apache.flink.table.runtime.join.stream.bundle;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.bundle.KeyedCoBundleOperator;
import org.apache.flink.table.runtime.join.batch.NullAwareJoinHelper;
import org.apache.flink.table.runtime.join.stream.state.CountKeySizeStateHandler;
import org.apache.flink.table.runtime.join.stream.state.EmptyJoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyNotContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.WithoutPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.EmptyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinKeyContainPrimaryKeyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinKeyNotContainPrimaryKeyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.OnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.WithoutPrimaryKeyMatchStateHandler;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/join/stream/bundle/MiniBatchJoinStreamOperator.class */
public abstract class MiniBatchJoinStreamOperator extends KeyedCoBundleOperator implements Triggerable<BaseRow, Byte> {
    private static final long serialVersionUID = 1;
    protected final BaseRowTypeInfo leftType;
    protected final BaseRowTypeInfo rightType;
    protected GeneratedJoinConditionFunction condFuncCode;
    protected final KeySelector<BaseRow, BaseRow> leftKeySelector;
    protected final KeySelector<BaseRow, BaseRow> rightKeySelector;
    protected final BaseRowTypeInfo leftKeyType;
    protected final BaseRowTypeInfo rightKeyType;
    protected GeneratedProjection leftPkProjectCode;
    protected GeneratedProjection rightPkProjectCode;
    protected final JoinStateHandler.Type leftJoinStateType;
    protected final JoinStateHandler.Type rightJoinStateType;
    protected final long minRetentionTime;
    protected final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;
    protected transient KeyedValueState<BaseRow, Long> leftTimerState;
    protected transient KeyedValueState<BaseRow, Long> rightTimerState;
    protected transient JoinConditionFunction condFunc;
    protected transient JoinStateHandler leftStateHandler;
    protected transient JoinStateHandler rightStateHandler;
    protected transient JoinedRow joinedRow;
    protected transient TimestampedCollector<BaseRow> collector;
    protected transient InternalTimerService<Byte> internalTimerService;
    protected boolean leftIsAccRetract;
    protected boolean rightIsAccRetract;
    protected boolean[] filterNullKeys;
    protected int[] nullFilterKeys;

    public MiniBatchJoinStreamOperator(BaseRowTypeInfo baseRowTypeInfo, BaseRowTypeInfo baseRowTypeInfo2, GeneratedJoinConditionFunction generatedJoinConditionFunction, KeySelector<BaseRow, BaseRow> keySelector, KeySelector<BaseRow, BaseRow> keySelector2, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, JoinStateHandler.Type type, JoinStateHandler.Type type2, long j, long j2, Boolean bool, Boolean bool2, boolean[] zArr, CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger, boolean z) {
        super(coBundleTrigger, z);
        this.leftType = baseRowTypeInfo;
        this.rightType = baseRowTypeInfo2;
        this.condFuncCode = generatedJoinConditionFunction;
        this.leftKeySelector = keySelector;
        this.rightKeySelector = keySelector2;
        this.leftKeyType = (BaseRowTypeInfo) ((ResultTypeQueryable) keySelector).getProducedType2();
        this.rightKeyType = (BaseRowTypeInfo) ((ResultTypeQueryable) keySelector2).getProducedType2();
        this.leftPkProjectCode = generatedProjection;
        this.rightPkProjectCode = generatedProjection2;
        this.leftJoinStateType = type;
        this.rightJoinStateType = type2;
        this.maxRetentionTime = j;
        this.minRetentionTime = j2;
        this.stateCleaningEnabled = j2 > 1;
        this.leftIsAccRetract = bool.booleanValue();
        this.rightIsAccRetract = bool2.booleanValue();
        this.filterNullKeys = zArr;
        if (zArr == null || zArr.length == 0) {
            this.nullFilterKeys = null;
        } else {
            this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(zArr);
        }
    }

    @Override // org.apache.flink.table.runtime.bundle.KeyedCoBundleOperator, org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator
    public void open() throws Exception {
        super.open();
        this.internalTimerService = getInternalTimerService("join-timers", ByteSerializer.INSTANCE, this);
        LOG.debug("Compiling JoinConditionFunction: {} \n\n Code:\n {}", this.condFuncCode.name(), this.condFuncCode.code());
        Class compile = CodeGenUtils.compile(getContainingTask().getUserCodeClassLoader(), this.condFuncCode.name(), this.condFuncCode.code());
        this.condFuncCode = null;
        this.condFunc = (JoinConditionFunction) compile.newInstance();
        this.collector = new TimestampedCollector<>(this.output);
        this.joinedRow = new JoinedRow();
        initAllStates();
        this.leftPkProjectCode = null;
        this.rightPkProjectCode = null;
    }

    private boolean isNotNullSafe() {
        return (this.nullFilterKeys == null || this.nullFilterKeys.length == 0) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initAllStates() throws Exception {
        this.leftStateHandler = createJoinStateHandler(this.leftType, this.leftJoinStateType, "leftJoinState", this.leftKeySelector, this.leftKeyType, this.leftPkProjectCode);
        this.rightStateHandler = createJoinStateHandler(this.rightType, this.rightJoinStateType, "rightJoinState", this.rightKeySelector, this.rightKeyType, this.rightPkProjectCode);
        if (this.stateCleaningEnabled) {
            this.leftTimerState = createCleanupTimeState("left-time-state");
            this.rightTimerState = createCleanupTimeState("right-time-state");
        }
    }

    protected JoinStateHandler createJoinStateHandler(BaseRowTypeInfo baseRowTypeInfo, JoinStateHandler.Type type, String str, KeySelector<BaseRow, BaseRow> keySelector, BaseRowTypeInfo baseRowTypeInfo2, GeneratedProjection generatedProjection) throws Exception {
        JoinStateHandler countKeySizeStateHandler;
        AbstractRowSerializer<BaseRow> createSerializer = baseRowTypeInfo2.createSerializer();
        AbstractRowSerializer<BaseRow> createSerializer2 = baseRowTypeInfo.createSerializer();
        switch (type) {
            case JOIN_KEY_CONTAIN_PRIMARY_KEY:
                countKeySizeStateHandler = new JoinKeyContainPrimaryKeyStateHandler(getKeyedState(new KeyedValueStateDescriptor(str, createSerializer, createSerializer2)), keySelector);
                break;
            case JOIN_KEY_NOT_CONTAIN_PRIMARY_KEY:
                countKeySizeStateHandler = new JoinKeyNotContainPrimaryKeyStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer, DataTypes.createInternalSerializer(generatedProjection.expr().resultType()), new TupleTypeInfo(baseRowTypeInfo, Types.LONG).createSerializer(new ExecutionConfig()))), keySelector, (Projection) CodeGenUtils.compile(getContainingTask().getUserCodeClassLoader(), generatedProjection.name(), generatedProjection.code()).newInstance());
                break;
            case WITHOUT_PRIMARY_KEY:
                countKeySizeStateHandler = new WithoutPrimaryKeyStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer, createSerializer2, new TupleTypeInfo(Types.LONG, Types.LONG).createSerializer(new ExecutionConfig()))), keySelector);
                break;
            case EMPTY:
                countKeySizeStateHandler = new EmptyJoinStateHandler();
                break;
            case COUNT_KEY_SIZE:
                countKeySizeStateHandler = new CountKeySizeStateHandler(getKeyedState(new KeyedValueStateDescriptor(str, createSerializer, Types.LONG.createSerializer(new ExecutionConfig()))), keySelector);
                break;
            default:
                throw new IOException("Unrecognized type: " + type);
        }
        return countKeySizeStateHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JoinMatchStateHandler createMatchStateHandler(BaseRowTypeInfo baseRowTypeInfo, JoinMatchStateHandler.Type type, BaseRowTypeInfo baseRowTypeInfo2, String str, GeneratedProjection generatedProjection) throws Exception {
        JoinMatchStateHandler onlyEqualityConditionMatchStateHandler;
        AbstractRowSerializer<BaseRow> createSerializer = baseRowTypeInfo.createSerializer();
        AbstractRowSerializer<BaseRow> createSerializer2 = baseRowTypeInfo2.createSerializer();
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        switch (type) {
            case WITHOUT_PRIMARY_KEY_MATCH:
                onlyEqualityConditionMatchStateHandler = new WithoutPrimaryKeyMatchStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer2, createSerializer, longSerializer)));
                break;
            case JOIN_KEY_NOT_CONTAIN_PRIMARY_KEY_MATCH:
                onlyEqualityConditionMatchStateHandler = new JoinKeyNotContainPrimaryKeyMatchStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer2, DataTypes.createInternalSerializer(generatedProjection.expr().resultType()), longSerializer)), (Projection) CodeGenUtils.compile(getContainingTask().getUserCodeClassLoader(), generatedProjection.name(), generatedProjection.code()).newInstance());
                break;
            case JOIN_KEY_CONTAIN_PRIMARY_KEY_MATCH:
                onlyEqualityConditionMatchStateHandler = new JoinKeyContainPrimaryKeyMatchStateHandler(getKeyedState(new KeyedValueStateDescriptor(str, createSerializer2, longSerializer)));
                break;
            case EMPTY_MATCH:
                onlyEqualityConditionMatchStateHandler = new EmptyMatchStateHandler();
                break;
            case ONLY_EQUALITY_CONDITION_EMPTY_MATCH:
                onlyEqualityConditionMatchStateHandler = new OnlyEqualityConditionMatchStateHandler(getKeyedState(new KeyedValueStateDescriptor(str, createSerializer2, longSerializer)));
                break;
            default:
                throw new IOException("Unrecognized type: " + type);
        }
        return onlyEqualityConditionMatchStateHandler;
    }

    public void onEventTime(InternalTimer<BaseRow, Byte> internalTimer) throws Exception {
        throw new UnsupportedOperationException("Don't support handle event time for join operator!");
    }

    protected TimestampedCollector<BaseRow> getCollector() {
        return this.collector;
    }

    protected KeyedValueState<BaseRow, Long> createCleanupTimeState(String str) throws Exception {
        return getKeyedState(new KeyedValueStateDescriptor("left-" + str, this.leftKeyType.createSerializer(), LongSerializer.INSTANCE));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerProcessingCleanupTimer(BaseRow baseRow, long j, boolean z, KeyedValueState<BaseRow, Long> keyedValueState) {
        if (this.stateCleaningEnabled) {
            Long l = (Long) keyedValueState.get(baseRow);
            if (l == null || j + this.minRetentionTime > l.longValue()) {
                long j2 = j + this.maxRetentionTime;
                this.internalTimerService.registerProcessingTimeTimer(Byte.valueOf((byte) (z ? 1 : 2)), j2);
                keyedValueState.put(baseRow, Long.valueOf(j2));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean needToCleanupState(BaseRow baseRow, long j, KeyedValueState<BaseRow, Long> keyedValueState) {
        Long l = (Long) keyedValueState.get(baseRow);
        return null != l && j == l.longValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean applyCondition(BaseRow baseRow, BaseRow baseRow2, BaseRow baseRow3) throws Exception {
        if (isNotNullSafe()) {
            IntStream stream = Arrays.stream(this.nullFilterKeys);
            baseRow3.getClass();
            if (stream.filter(baseRow3::isNullAt).findFirst().isPresent()) {
                return false;
            }
        }
        return this.condFunc.apply(baseRow, baseRow2);
    }

    public List<Tuple2<BaseRow, Long>> reduceCurrentList(Iterable<BaseRow> iterable, JoinStateHandler joinStateHandler, Boolean bool) {
        return (bool.booleanValue() || (joinStateHandler instanceof WithoutPrimaryKeyStateHandler) || (joinStateHandler instanceof CountKeySizeStateHandler)) ? appendReduceCurrentList(iterable) : upsertReduceCurrentList(iterable, joinStateHandler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v21, types: [java.lang.Long, T0] */
    /* JADX WARN: Type inference failed for: r1v28, types: [java.lang.Long, T1] */
    /* JADX WARN: Type inference failed for: r1v9, types: [java.lang.Long, T0] */
    public List<Tuple2<BaseRow, Long>> appendReduceCurrentList(Iterable<BaseRow> iterable) {
        LinkedList linkedList = new LinkedList();
        HashMap hashMap = new HashMap();
        for (BaseRow baseRow : iterable) {
            byte header = baseRow.getHeader();
            baseRow.setHeader((byte) 0);
            Tuple2 tuple2 = (Tuple2) hashMap.get(baseRow);
            if (tuple2 == null) {
                tuple2 = new Tuple2(0L, 0L);
            }
            Tuple2 tuple22 = tuple2;
            tuple22.f0 = Long.valueOf(((Long) tuple22.f0).longValue() + 1);
            Tuple2 tuple23 = tuple2;
            tuple23.f1 = Long.valueOf(((Long) tuple23.f1).longValue() + (header == 0 ? 1L : -1L));
            hashMap.put(baseRow, tuple2);
        }
        Iterator<BaseRow> it = iterable.iterator();
        while (it.hasNext()) {
            BaseRow next = it.next();
            next.setHeader((byte) 0);
            Tuple2 tuple24 = (Tuple2) hashMap.get(next);
            tuple24.f0 = Long.valueOf(((Long) tuple24.f0).longValue() - 1);
            if (((Long) tuple24.f0).longValue() != 0 || ((Long) tuple24.f1).longValue() == 0) {
                it.remove();
            } else {
                linkedList.add(new Tuple2(next, tuple24.f1));
            }
        }
        return linkedList;
    }

    public List<Tuple2<BaseRow, Long>> upsertReduceCurrentList(Iterable<BaseRow> iterable, JoinStateHandler joinStateHandler) {
        LinkedList linkedList = new LinkedList();
        if (joinStateHandler instanceof JoinKeyContainPrimaryKeyStateHandler) {
            BaseRow baseRow = null;
            Iterator<BaseRow> it = iterable.iterator();
            while (it.hasNext()) {
                baseRow = it.next();
            }
            if (baseRow != null) {
                linkedList.add(Tuple2.of(baseRow, 1L));
            }
        } else {
            if (!(joinStateHandler instanceof JoinKeyNotContainPrimaryKeyStateHandler)) {
                throw new RuntimeException("This is a bug, upsertReduceCurrentList should not be called.");
            }
            HashMap hashMap = new HashMap();
            for (BaseRow baseRow2 : iterable) {
                joinStateHandler.extractCurrentPrimaryKey(baseRow2);
                hashMap.put(joinStateHandler.getCurrentPrimaryKey(), baseRow2);
            }
            for (BaseRow baseRow3 : iterable) {
                joinStateHandler.extractCurrentPrimaryKey(baseRow3);
                if (baseRow3.equals(hashMap.get(joinStateHandler.getCurrentPrimaryKey()))) {
                    linkedList.add(Tuple2.of(baseRow3, 1L));
                }
            }
        }
        return linkedList;
    }

    public void collectResult(BaseRow baseRow) {
        this.collector.collect(baseRow);
    }

    public void collectResult(BaseRow baseRow, long j) {
        long j2 = j < 0 ? -j : j;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= j2) {
                return;
            }
            this.collector.collect(baseRow);
            j3 = j4 + 1;
        }
    }
}
