/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.join.stream;

import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.join.stream.JoinStreamOperator;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

@Internal
public class InnerJoinStreamOperator
extends JoinStreamOperator {
    private static final long serialVersionUID = 1L;

    public InnerJoinStreamOperator(BaseRowTypeInfo leftType, BaseRowTypeInfo rightType, GeneratedJoinConditionFunction condFuncCode, KeySelector<BaseRow, BaseRow> leftKeySelector, KeySelector<BaseRow, BaseRow> rightKeySelector, GeneratedProjection leftPkProjectCode, GeneratedProjection rightPkProjectCode, JoinStateHandler.Type leftJoinStateType, JoinStateHandler.Type rightJoinStateType, long maxRetentionTime, long minRetentionTime, boolean[] filterNullKeys) {
        super(leftType, rightType, condFuncCode, leftKeySelector, rightKeySelector, leftPkProjectCode, rightPkProjectCode, leftJoinStateType, rightJoinStateType, maxRetentionTime, minRetentionTime, filterNullKeys);
    }

    @Override
    public void open() throws Exception {
        super.open();
        LOG.info("leftJoinStateType {}, rightJoinStateType {}", (Object)this.leftJoinStateType, (Object)this.rightJoinStateType);
        LOG.info("Init InnerJoinStreamOperator.");
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> element) throws Exception {
        this.processElement(this.getOrCopyBaseRow(element, true), this.leftStateHandler, this.rightStateHandler, true, (KeyedValueState<BaseRow, Long>)this.leftTimerState, this.stateCleaningEnabled);
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> element) throws Exception {
        this.processElement(this.getOrCopyBaseRow(element, false), this.rightStateHandler, this.leftStateHandler, false, (KeyedValueState<BaseRow, Long>)this.rightTimerState, this.stateCleaningEnabled);
        return TwoInputSelection.ANY;
    }

    public void onProcessingTime(InternalTimer<BaseRow, Byte> timer) throws Exception {
        byte namespace = (Byte)timer.getNamespace();
        if (namespace == 1) {
            if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.leftTimerState)) {
                this.leftStateHandler.remove((BaseRow)timer.getKey());
            }
        } else if (this.needToCleanupState((BaseRow)timer.getKey(), timer.getTimestamp(), (KeyedValueState<BaseRow, Long>)this.rightTimerState)) {
            this.rightStateHandler.remove((BaseRow)timer.getKey());
        }
    }

    private void processElement(BaseRow input, JoinStateHandler inputStateHandler, JoinStateHandler otherStateHandler, boolean inputIsLeft, KeyedValueState<BaseRow, Long> timerState, boolean cleaningBasedTimer) throws Exception {
        inputStateHandler.extractCurrentJoinKey(input);
        inputStateHandler.extractCurrentPrimaryKey(input);
        long currentTime2 = this.internalTimerService.currentProcessingTime();
        if (BaseRowUtil.isRetractMsg(input)) {
            inputStateHandler.retract(input);
        } else {
            this.registerProcessingCleanupTimer(inputStateHandler.getCurrentJoinKey(), currentTime2, inputIsLeft, timerState);
            inputStateHandler.add(input, currentTime2 + this.maxRetentionTime);
        }
        Iterator<Tuple3<BaseRow, Long, Long>> iterator = otherStateHandler.getRecords(inputStateHandler.getCurrentJoinKey());
        this.joinedRow.setHeader(input.getHeader());
        while (iterator.hasNext()) {
            int i;
            Tuple3<BaseRow, Long, Long> tuple3 = iterator.next();
            BaseRow baseRow = (BaseRow)tuple3.f0;
            long count = (Long)tuple3.f1;
            if (inputIsLeft) {
                if (this.applyCondition(input, baseRow, inputStateHandler.getCurrentJoinKey())) {
                    this.joinedRow.replace(input, baseRow);
                    i = 0;
                    while ((long)i < count) {
                        this.collector.collect((Object)this.joinedRow);
                        ++i;
                    }
                }
            } else if (this.applyCondition(baseRow, input, inputStateHandler.getCurrentJoinKey())) {
                i = 0;
                while ((long)i < count) {
                    this.joinedRow.replace(baseRow, input);
                    this.collector.collect((Object)this.joinedRow);
                    ++i;
                }
            }
            if ((Long)tuple3.f2 > currentTime2 || !this.stateCleaningEnabled) continue;
            iterator.remove();
        }
    }
}

