package org.apache.flink.table.runtime.join.stream;

import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/join/stream/InnerJoinStreamOperator.class */
public class InnerJoinStreamOperator extends JoinStreamOperator {
    private static final long serialVersionUID = 1;

    public InnerJoinStreamOperator(BaseRowTypeInfo baseRowTypeInfo, BaseRowTypeInfo baseRowTypeInfo2, GeneratedJoinConditionFunction generatedJoinConditionFunction, KeySelector<BaseRow, BaseRow> keySelector, KeySelector<BaseRow, BaseRow> keySelector2, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, JoinStateHandler.Type type, JoinStateHandler.Type type2, long j, long j2, boolean[] zArr) {
        super(baseRowTypeInfo, baseRowTypeInfo2, generatedJoinConditionFunction, keySelector, keySelector2, generatedProjection, generatedProjection2, type, type2, j, j2, zArr);
    }

    @Override // org.apache.flink.table.runtime.join.stream.JoinStreamOperator, org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator
    public void open() throws Exception {
        super.open();
        LOG.info("leftJoinStateType {}, rightJoinStateType {}", this.leftJoinStateType, this.rightJoinStateType);
        LOG.info("Init InnerJoinStreamOperator.");
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        processElement(getOrCopyBaseRow(streamRecord, true), this.leftStateHandler, this.rightStateHandler, true, this.leftTimerState, this.stateCleaningEnabled);
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        processElement(getOrCopyBaseRow(streamRecord, false), this.rightStateHandler, this.leftStateHandler, false, this.rightTimerState, this.stateCleaningEnabled);
        return TwoInputSelection.ANY;
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> internalTimer) throws Exception {
        if (((Byte) internalTimer.getNamespace()).byteValue() == 1) {
            if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.leftTimerState)) {
                this.leftStateHandler.remove((BaseRow) internalTimer.getKey());
            }
        } else if (needToCleanupState((BaseRow) internalTimer.getKey(), internalTimer.getTimestamp(), this.rightTimerState)) {
            this.rightStateHandler.remove((BaseRow) internalTimer.getKey());
        }
    }

    private void processElement(BaseRow baseRow, JoinStateHandler joinStateHandler, JoinStateHandler joinStateHandler2, boolean z, KeyedValueState<BaseRow, Long> keyedValueState, boolean z2) throws Exception {
        joinStateHandler.extractCurrentJoinKey(baseRow);
        joinStateHandler.extractCurrentPrimaryKey(baseRow);
        long currentProcessingTime = this.internalTimerService.currentProcessingTime();
        if (BaseRowUtil.isRetractMsg(baseRow)) {
            joinStateHandler.retract(baseRow);
        } else {
            registerProcessingCleanupTimer(joinStateHandler.getCurrentJoinKey(), currentProcessingTime, z, keyedValueState);
            joinStateHandler.add(baseRow, currentProcessingTime + this.maxRetentionTime);
        }
        Iterator<Tuple3<BaseRow, Long, Long>> records = joinStateHandler2.getRecords(joinStateHandler.getCurrentJoinKey());
        this.joinedRow.setHeader(baseRow.getHeader());
        while (records.hasNext()) {
            Tuple3<BaseRow, Long, Long> next = records.next();
            BaseRow baseRow2 = next.f0;
            long longValue = next.f1.longValue();
            if (z) {
                if (applyCondition(baseRow, baseRow2, joinStateHandler.getCurrentJoinKey())) {
                    this.joinedRow.replace(baseRow, baseRow2);
                    for (int i = 0; i < longValue; i++) {
                        this.collector.collect(this.joinedRow);
                    }
                }
            } else if (applyCondition(baseRow2, baseRow, joinStateHandler.getCurrentJoinKey())) {
                for (int i2 = 0; i2 < longValue; i2++) {
                    this.joinedRow.replace(baseRow2, baseRow);
                    this.collector.collect(this.joinedRow);
                }
            }
            if (next.f2.longValue() <= currentProcessingTime && this.stateCleaningEnabled) {
                records.remove();
            }
        }
    }
}
