package org.apache.flink.table.runtime.join.stream;

import java.io.IOException;
import java.util.Arrays;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator;
import org.apache.flink.table.runtime.join.batch.NullAwareJoinHelper;
import org.apache.flink.table.runtime.join.stream.state.EmptyJoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyNotContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.WithoutPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.EmptyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinKeyContainPrimaryKeyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinKeyNotContainPrimaryKeyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.NonBatchOnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.WithoutPrimaryKeyMatchStateHandler;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/join/stream/JoinStreamOperator.class */
public abstract class JoinStreamOperator extends TriggerableOperator<BaseRow, Byte, BaseRow> implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
    private static final long serialVersionUID = 1;
    protected final BaseRowTypeInfo leftType;
    protected final BaseRowTypeInfo rightType;
    protected GeneratedJoinConditionFunction condFuncCode;
    protected final KeySelector<BaseRow, BaseRow> leftKeySelector;
    protected final KeySelector<BaseRow, BaseRow> rightKeySelector;
    protected final BaseRowTypeInfo leftKeyType;
    protected final BaseRowTypeInfo rightKeyType;
    protected GeneratedProjection leftPkProjectCode;
    protected GeneratedProjection rightPkProjectCode;
    protected final JoinStateHandler.Type leftJoinStateType;
    protected final JoinStateHandler.Type rightJoinStateType;
    protected final long minRetentionTime;
    protected final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;
    protected transient KeyedValueState<BaseRow, Long> leftTimerState;
    protected transient KeyedValueState<BaseRow, Long> rightTimerState;
    protected transient JoinConditionFunction condFunc;
    protected transient JoinStateHandler leftStateHandler;
    protected transient JoinStateHandler rightStateHandler;
    protected transient JoinedRow joinedRow;
    protected transient TimestampedCollector<BaseRow> collector;
    protected transient InternalTimerService<Byte> internalTimerService;
    protected boolean[] filterNullKeys;
    protected int[] nullFilterKeys;
    protected AbstractRowSerializer<BaseRow> leftSer;
    protected AbstractRowSerializer<BaseRow> rightSer;
    private boolean isObjectReuse;

    public JoinStreamOperator(BaseRowTypeInfo baseRowTypeInfo, BaseRowTypeInfo baseRowTypeInfo2, GeneratedJoinConditionFunction generatedJoinConditionFunction, KeySelector<BaseRow, BaseRow> keySelector, KeySelector<BaseRow, BaseRow> keySelector2, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, JoinStateHandler.Type type, JoinStateHandler.Type type2, long j, long j2, boolean[] zArr) {
        this.leftType = baseRowTypeInfo;
        this.rightType = baseRowTypeInfo2;
        this.condFuncCode = generatedJoinConditionFunction;
        this.leftKeySelector = keySelector;
        this.rightKeySelector = keySelector2;
        this.leftKeyType = (BaseRowTypeInfo) ((ResultTypeQueryable) keySelector).getProducedType2();
        this.rightKeyType = (BaseRowTypeInfo) ((ResultTypeQueryable) keySelector2).getProducedType2();
        this.leftPkProjectCode = generatedProjection;
        this.rightPkProjectCode = generatedProjection2;
        this.leftJoinStateType = type;
        this.rightJoinStateType = type2;
        this.maxRetentionTime = j;
        this.minRetentionTime = j2;
        this.stateCleaningEnabled = j2 > 1;
        this.filterNullKeys = zArr;
        if (zArr == null || zArr.length == 0) {
            this.nullFilterKeys = null;
        } else {
            this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(zArr);
        }
    }

    @Override // org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator
    public void open() throws Exception {
        super.open();
        this.internalTimerService = getInternalTimerService("join-timers", ByteSerializer.INSTANCE, this);
        LOG.debug("Compiling JoinConditionFunction: {} \n\n Code:\n {}", this.condFuncCode.name(), this.condFuncCode.code());
        Class compile = CodeGenUtils.compile(getContainingTask().getUserCodeClassLoader(), this.condFuncCode.name(), this.condFuncCode.code());
        this.condFuncCode = null;
        this.condFunc = (JoinConditionFunction) compile.newInstance();
        this.collector = new TimestampedCollector<>(this.output);
        this.joinedRow = new JoinedRow();
        initAllStates();
        this.leftPkProjectCode = null;
        this.rightPkProjectCode = null;
        this.leftSer = (AbstractRowSerializer) this.leftType.createSerializer(getExecutionConfig());
        this.rightSer = (AbstractRowSerializer) this.rightType.createSerializer(getExecutionConfig());
        this.isObjectReuse = getExecutionConfig().isObjectReuseEnabled();
    }

    private boolean isNotNullSafe() {
        return (this.nullFilterKeys == null || this.nullFilterKeys.length == 0) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initAllStates() throws Exception {
        this.leftStateHandler = createJoinStateHandler(this.leftType, this.leftJoinStateType, "leftJoinState", this.leftKeySelector, this.leftKeyType, this.leftPkProjectCode);
        this.rightStateHandler = createJoinStateHandler(this.rightType, this.rightJoinStateType, "rightJoinState", this.rightKeySelector, this.rightKeyType, this.rightPkProjectCode);
        if (this.stateCleaningEnabled) {
            this.leftTimerState = createCleanupTimeState("left-time-state");
            this.rightTimerState = createCleanupTimeState("right-time-state");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JoinStateHandler createJoinStateHandler(BaseRowTypeInfo baseRowTypeInfo, JoinStateHandler.Type type, String str, KeySelector<BaseRow, BaseRow> keySelector, BaseRowTypeInfo baseRowTypeInfo2, GeneratedProjection generatedProjection) throws Exception {
        JoinStateHandler emptyJoinStateHandler;
        AbstractRowSerializer<BaseRow> createSerializer = baseRowTypeInfo2.createSerializer();
        AbstractRowSerializer<BaseRow> createSerializer2 = baseRowTypeInfo.createSerializer();
        switch (type) {
            case JOIN_KEY_CONTAIN_PRIMARY_KEY:
                emptyJoinStateHandler = new JoinKeyContainPrimaryKeyStateHandler(getKeyedState(new KeyedValueStateDescriptor(str, createSerializer, createSerializer2)), keySelector);
                break;
            case JOIN_KEY_NOT_CONTAIN_PRIMARY_KEY:
                emptyJoinStateHandler = new JoinKeyNotContainPrimaryKeyStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer, DataTypes.createInternalSerializer(generatedProjection.expr().resultType()), new TupleTypeInfo(baseRowTypeInfo, Types.LONG).createSerializer(new ExecutionConfig()))), keySelector, (Projection) CodeGenUtils.compile(getContainingTask().getUserCodeClassLoader(), generatedProjection.name(), generatedProjection.code()).newInstance());
                break;
            case WITHOUT_PRIMARY_KEY:
                emptyJoinStateHandler = new WithoutPrimaryKeyStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer, createSerializer2, new TupleTypeInfo(Types.LONG, Types.LONG).createSerializer(new ExecutionConfig()))), keySelector);
                break;
            case EMPTY:
                emptyJoinStateHandler = new EmptyJoinStateHandler();
                break;
            default:
                throw new IOException("Unrecognized type: " + type);
        }
        return emptyJoinStateHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JoinMatchStateHandler createMatchStateHandler(BaseRowTypeInfo baseRowTypeInfo, JoinMatchStateHandler.Type type, BaseRowTypeInfo baseRowTypeInfo2, String str, GeneratedProjection generatedProjection) throws Exception {
        JoinMatchStateHandler nonBatchOnlyEqualityConditionMatchStateHandler;
        AbstractRowSerializer<BaseRow> createSerializer = baseRowTypeInfo.createSerializer();
        AbstractRowSerializer<BaseRow> createSerializer2 = baseRowTypeInfo2.createSerializer();
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        switch (type) {
            case WITHOUT_PRIMARY_KEY_MATCH:
                nonBatchOnlyEqualityConditionMatchStateHandler = new WithoutPrimaryKeyMatchStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer2, createSerializer, longSerializer)));
                break;
            case JOIN_KEY_NOT_CONTAIN_PRIMARY_KEY_MATCH:
                nonBatchOnlyEqualityConditionMatchStateHandler = new JoinKeyNotContainPrimaryKeyMatchStateHandler(getKeyedState(new KeyedMapStateDescriptor(str, createSerializer2, DataTypes.createInternalSerializer(generatedProjection.expr().resultType()), longSerializer)), (Projection) CodeGenUtils.compile(getContainingTask().getUserCodeClassLoader(), generatedProjection.name(), generatedProjection.code()).newInstance());
                break;
            case JOIN_KEY_CONTAIN_PRIMARY_KEY_MATCH:
                nonBatchOnlyEqualityConditionMatchStateHandler = new JoinKeyContainPrimaryKeyMatchStateHandler(getKeyedState(new KeyedValueStateDescriptor(str, createSerializer2, longSerializer)));
                break;
            case EMPTY_MATCH:
                nonBatchOnlyEqualityConditionMatchStateHandler = new EmptyMatchStateHandler();
                break;
            case ONLY_EQUALITY_CONDITION_EMPTY_MATCH:
                nonBatchOnlyEqualityConditionMatchStateHandler = new NonBatchOnlyEqualityConditionMatchStateHandler();
                break;
            default:
                throw new IOException("Unrecognized type: " + type);
        }
        return nonBatchOnlyEqualityConditionMatchStateHandler;
    }

    public void onEventTime(InternalTimer<BaseRow, Byte> internalTimer) throws Exception {
        throw new UnsupportedOperationException("Don't support handle event time for join operator!");
    }

    protected TimestampedCollector<BaseRow> getCollector() {
        return this.collector;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KeyedValueState<BaseRow, Long> createCleanupTimeState(String str) throws Exception {
        return getKeyedState(new KeyedValueStateDescriptor("left-" + str, this.leftKeyType.createSerializer(), LongSerializer.INSTANCE));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerProcessingCleanupTimer(BaseRow baseRow, long j, boolean z, KeyedValueState<BaseRow, Long> keyedValueState) {
        if (this.stateCleaningEnabled) {
            Long l = (Long) keyedValueState.get(baseRow);
            if (l == null || j + this.minRetentionTime > l.longValue()) {
                long j2 = j + this.maxRetentionTime;
                this.internalTimerService.registerProcessingTimeTimer(Byte.valueOf((byte) (z ? 1 : 2)), j2);
                keyedValueState.put(baseRow, Long.valueOf(j2));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean needToCleanupState(BaseRow baseRow, long j, KeyedValueState<BaseRow, Long> keyedValueState) {
        Long l = (Long) keyedValueState.get(baseRow);
        return null != l && j == l.longValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean applyCondition(BaseRow baseRow, BaseRow baseRow2, BaseRow baseRow3) throws Exception {
        if (isNotNullSafe()) {
            IntStream stream = Arrays.stream(this.nullFilterKeys);
            baseRow3.getClass();
            if (stream.filter(baseRow3::isNullAt).findFirst().isPresent()) {
                return false;
            }
        }
        return this.condFunc.apply(baseRow, baseRow2);
    }

    public boolean requireState() {
        return true;
    }

    public void endInput1() throws Exception {
    }

    public void endInput2() throws Exception {
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    public BaseRow getOrCopyBaseRow(StreamRecord<BaseRow> streamRecord, Boolean bool) {
        return bool.booleanValue() ? this.isObjectReuse ? (BaseRow) this.leftSer.copy(streamRecord.getValue()) : (BaseRow) streamRecord.getValue() : this.isObjectReuse ? (BaseRow) this.rightSer.copy(streamRecord.getValue()) : (BaseRow) streamRecord.getValue();
    }
}
