/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.join.stream;

import java.io.IOException;
import java.util.Arrays;
import java.util.OptionalInt;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.join.batch.NullAwareJoinHelper;
import org.apache.flink.table.runtime.join.stream.state.EmptyJoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinKeyNotContainPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.WithoutPrimaryKeyStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.EmptyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinKeyContainPrimaryKeyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinKeyNotContainPrimaryKeyMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.NonBatchOnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.WithoutPrimaryKeyMatchStateHandler;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

@Internal
public abstract class JoinStreamOperator
extends AbstractStreamOperator<BaseRow>
implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow>,
Triggerable<BaseRow, Byte> {
    private static final long serialVersionUID = 1L;
    protected final BaseRowTypeInfo leftType;
    protected final BaseRowTypeInfo rightType;
    protected GeneratedJoinConditionFunction condFuncCode;
    protected final KeySelector<BaseRow, BaseRow> leftKeySelector;
    protected final KeySelector<BaseRow, BaseRow> rightKeySelector;
    protected final BaseRowTypeInfo leftKeyType;
    protected final BaseRowTypeInfo rightKeyType;
    protected GeneratedProjection leftPkProjectCode;
    protected GeneratedProjection rightPkProjectCode;
    protected final JoinStateHandler.Type leftJoinStateType;
    protected final JoinStateHandler.Type rightJoinStateType;
    protected final long minRetentionTime;
    protected final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;
    protected transient KeyedValueState<BaseRow, Long> leftTimerState;
    protected transient KeyedValueState<BaseRow, Long> rightTimerState;
    protected transient JoinConditionFunction condFunc;
    protected transient JoinStateHandler leftStateHandler;
    protected transient JoinStateHandler rightStateHandler;
    protected transient JoinedRow joinedRow;
    protected transient TimestampedCollector<BaseRow> collector;
    protected transient InternalTimerService<Byte> internalTimerService;
    protected boolean[] filterNullKeys;
    protected int[] nullFilterKeys;
    protected AbstractRowSerializer<BaseRow> leftSer;
    protected AbstractRowSerializer<BaseRow> rightSer;
    private boolean isObjectReuse;

    public JoinStreamOperator(BaseRowTypeInfo leftType, BaseRowTypeInfo rightType, GeneratedJoinConditionFunction condFuncCode, KeySelector<BaseRow, BaseRow> leftKeySelector, KeySelector<BaseRow, BaseRow> rightKeySelector, GeneratedProjection leftPkProjectCode, GeneratedProjection rightPkProjectCode, JoinStateHandler.Type leftJoinStateType, JoinStateHandler.Type rightJoinStateType, long maxRetentionTime, long minRetentionTime, boolean[] filterNullKeys) {
        this.leftType = leftType;
        this.rightType = rightType;
        this.condFuncCode = condFuncCode;
        this.leftKeySelector = leftKeySelector;
        this.rightKeySelector = rightKeySelector;
        this.leftKeyType = (BaseRowTypeInfo)((ResultTypeQueryable)((Object)leftKeySelector)).getProducedType();
        this.rightKeyType = (BaseRowTypeInfo)((ResultTypeQueryable)((Object)rightKeySelector)).getProducedType();
        this.leftPkProjectCode = leftPkProjectCode;
        this.rightPkProjectCode = rightPkProjectCode;
        this.leftJoinStateType = leftJoinStateType;
        this.rightJoinStateType = rightJoinStateType;
        this.maxRetentionTime = maxRetentionTime;
        this.minRetentionTime = minRetentionTime;
        this.stateCleaningEnabled = minRetentionTime > 1L;
        this.filterNullKeys = filterNullKeys;
        this.nullFilterKeys = (int[])(filterNullKeys == null || filterNullKeys.length == 0 ? null : NullAwareJoinHelper.getNullFilterKeys(filterNullKeys));
    }

    public void open() throws Exception {
        super.open();
        this.internalTimerService = this.getInternalTimerService("join-timers", ByteSerializer.INSTANCE, this);
        LOG.debug("Compiling JoinConditionFunction: {} \n\n Code:\n {}", (Object)this.condFuncCode.name(), (Object)this.condFuncCode.code());
        Class condFuncClass = CodeGenUtils.compile(this.getContainingTask().getUserCodeClassLoader(), this.condFuncCode.name(), this.condFuncCode.code());
        this.condFuncCode = null;
        this.condFunc = (JoinConditionFunction)condFuncClass.newInstance();
        this.collector = new TimestampedCollector(this.output);
        this.joinedRow = new JoinedRow();
        this.initAllStates();
        this.leftPkProjectCode = null;
        this.rightPkProjectCode = null;
        this.leftSer = (AbstractRowSerializer)this.leftType.createSerializer(this.getExecutionConfig());
        this.rightSer = (AbstractRowSerializer)this.rightType.createSerializer(this.getExecutionConfig());
        this.isObjectReuse = this.getExecutionConfig().isObjectReuseEnabled();
    }

    private boolean isNotNullSafe() {
        return this.nullFilterKeys != null && this.nullFilterKeys.length != 0;
    }

    protected void initAllStates() throws Exception {
        this.leftStateHandler = this.createJoinStateHandler(this.leftType, this.leftJoinStateType, "leftJoinState", this.leftKeySelector, this.leftKeyType, this.leftPkProjectCode);
        this.rightStateHandler = this.createJoinStateHandler(this.rightType, this.rightJoinStateType, "rightJoinState", this.rightKeySelector, this.rightKeyType, this.rightPkProjectCode);
        if (this.stateCleaningEnabled) {
            this.leftTimerState = this.createCleanupTimeState("left-time-state");
            this.rightTimerState = this.createCleanupTimeState("right-time-state");
        }
    }

    protected JoinStateHandler createJoinStateHandler(BaseRowTypeInfo recordType, JoinStateHandler.Type type, String name, KeySelector<BaseRow, BaseRow> keySelector, BaseRowTypeInfo keyType, GeneratedProjection pkProjectCode) throws Exception {
        JoinStateHandler state;
        AbstractRowSerializer<BaseRow> joinKeySer = keyType.createSerializer();
        AbstractRowSerializer<BaseRow> recordSer = recordType.createSerializer();
        switch (type) {
            case JOIN_KEY_CONTAIN_PRIMARY_KEY: {
                KeyedValueStateDescriptor valueStateDescriptor = new KeyedValueStateDescriptor(name, joinKeySer, recordSer);
                state = new JoinKeyContainPrimaryKeyStateHandler((KeyedValueState<BaseRow, BaseRow>)((KeyedValueState)this.getKeyedState((KeyedStateDescriptor)valueStateDescriptor)), keySelector);
                break;
            }
            case JOIN_KEY_NOT_CONTAIN_PRIMARY_KEY: {
                Class pkProj = CodeGenUtils.compile(this.getContainingTask().getUserCodeClassLoader(), pkProjectCode.name(), pkProjectCode.code());
                Projection pkProjection = (Projection)pkProj.newInstance();
                TypeSerializer leftPkSer = DataTypes.createInternalSerializer(pkProjectCode.expr().resultType());
                TypeSerializer record2TimeSer = new TupleTypeInfo(recordType, Types.LONG).createSerializer(new ExecutionConfig());
                KeyedMapStateDescriptor mapStatePkDescriptor = new KeyedMapStateDescriptor(name, joinKeySer, leftPkSer, record2TimeSer);
                state = new JoinKeyNotContainPrimaryKeyStateHandler((KeyedMapState<BaseRow, BaseRow, Tuple2<BaseRow, Long>>)((KeyedMapState)this.getKeyedState((KeyedStateDescriptor)mapStatePkDescriptor)), keySelector, pkProjection);
                break;
            }
            case WITHOUT_PRIMARY_KEY: {
                TypeSerializer count2TimeSer = new TupleTypeInfo(Types.LONG, Types.LONG).createSerializer(new ExecutionConfig());
                KeyedMapStateDescriptor mapStateCountDescriptor = new KeyedMapStateDescriptor(name, joinKeySer, recordSer, count2TimeSer);
                state = new WithoutPrimaryKeyStateHandler((KeyedMapState<BaseRow, BaseRow, Tuple2<Long, Long>>)((KeyedMapState)this.getKeyedState((KeyedStateDescriptor)mapStateCountDescriptor)), keySelector);
                break;
            }
            case EMPTY: {
                state = new EmptyJoinStateHandler();
                break;
            }
            default: {
                throw new IOException("Unrecognized type: " + (Object)((Object)type));
            }
        }
        return state;
    }

    protected JoinMatchStateHandler createMatchStateHandler(BaseRowTypeInfo recordType, JoinMatchStateHandler.Type type, BaseRowTypeInfo keyType, String name, GeneratedProjection pkProjectCode) throws Exception {
        JoinMatchStateHandler state;
        AbstractRowSerializer<BaseRow> recordSer = recordType.createSerializer();
        AbstractRowSerializer<BaseRow> joinKeySer = keyType.createSerializer();
        LongSerializer joinCntSer = LongSerializer.INSTANCE;
        switch (type) {
            case WITHOUT_PRIMARY_KEY_MATCH: {
                KeyedMapStateDescriptor mapStateDescriptor = new KeyedMapStateDescriptor(name, joinKeySer, recordSer, (TypeSerializer)joinCntSer);
                state = new WithoutPrimaryKeyMatchStateHandler((KeyedMapState<BaseRow, BaseRow, Long>)((KeyedMapState)this.getKeyedState((KeyedStateDescriptor)mapStateDescriptor)));
                break;
            }
            case JOIN_KEY_NOT_CONTAIN_PRIMARY_KEY_MATCH: {
                TypeSerializer pkSer = DataTypes.createInternalSerializer(pkProjectCode.expr().resultType());
                Class pkProj = CodeGenUtils.compile(this.getContainingTask().getUserCodeClassLoader(), pkProjectCode.name(), pkProjectCode.code());
                Projection pkProjection = (Projection)pkProj.newInstance();
                KeyedMapStateDescriptor pkStateDescriptor = new KeyedMapStateDescriptor(name, joinKeySer, pkSer, (TypeSerializer)joinCntSer);
                state = new JoinKeyNotContainPrimaryKeyMatchStateHandler((KeyedMapState<BaseRow, BaseRow, Long>)((KeyedMapState)this.getKeyedState((KeyedStateDescriptor)pkStateDescriptor)), pkProjection);
                break;
            }
            case JOIN_KEY_CONTAIN_PRIMARY_KEY_MATCH: {
                KeyedValueStateDescriptor valueStateDescriptor = new KeyedValueStateDescriptor(name, joinKeySer, (TypeSerializer)joinCntSer);
                state = new JoinKeyContainPrimaryKeyMatchStateHandler((KeyedValueState<BaseRow, Long>)((KeyedValueState)this.getKeyedState((KeyedStateDescriptor)valueStateDescriptor)));
                break;
            }
            case EMPTY_MATCH: {
                state = new EmptyMatchStateHandler();
                break;
            }
            case ONLY_EQUALITY_CONDITION_EMPTY_MATCH: {
                state = new NonBatchOnlyEqualityConditionMatchStateHandler();
                break;
            }
            default: {
                throw new IOException("Unrecognized type: " + (Object)((Object)type));
            }
        }
        return state;
    }

    public void onEventTime(InternalTimer<BaseRow, Byte> timer) throws Exception {
        throw new UnsupportedOperationException("Don't support handle event time for join operator!");
    }

    protected TimestampedCollector<BaseRow> getCollector() {
        return this.collector;
    }

    protected KeyedValueState<BaseRow, Long> createCleanupTimeState(String timeStateName) throws Exception {
        AbstractRowSerializer<BaseRow> joinKeySer = this.leftKeyType.createSerializer();
        LongSerializer timeSer = LongSerializer.INSTANCE;
        KeyedValueStateDescriptor valueStateDescriptor = new KeyedValueStateDescriptor("left-" + timeStateName, joinKeySer, (TypeSerializer)timeSer);
        return (KeyedValueState)this.getKeyedState((KeyedStateDescriptor)valueStateDescriptor);
    }

    protected void registerProcessingCleanupTimer(BaseRow key, long currentTime2, boolean isLeft, KeyedValueState<BaseRow, Long> timerState) {
        Long curCleanupTime;
        if (this.stateCleaningEnabled && ((curCleanupTime = (Long)timerState.get((Object)key)) == null || currentTime2 + this.minRetentionTime > curCleanupTime)) {
            long cleanupTime = currentTime2 + this.maxRetentionTime;
            byte namespace = (byte)(isLeft ? 1 : 2);
            this.internalTimerService.registerProcessingTimeTimer((Object)namespace, cleanupTime);
            timerState.put((Object)key, (Object)cleanupTime);
        }
    }

    protected boolean needToCleanupState(BaseRow key, long timestamp, KeyedValueState<BaseRow, Long> timerState) {
        Long cleanupTime = (Long)timerState.get((Object)key);
        return null != cleanupTime && timestamp == cleanupTime;
    }

    protected boolean applyCondition(BaseRow leftRow, BaseRow rightRow, BaseRow joinKey) throws Exception {
        if (this.isNotNullSafe()) {
            OptionalInt result = Arrays.stream(this.nullFilterKeys).filter(joinKey::isNullAt).findFirst();
            if (result.isPresent()) {
                return false;
            }
        }
        return this.condFunc.apply(leftRow, rightRow);
    }

    public boolean requireState() {
        return true;
    }

    public void endInput1() throws Exception {
    }

    public void endInput2() throws Exception {
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    public BaseRow getOrCopyBaseRow(StreamRecord<BaseRow> element, Boolean isLeft) {
        if (isLeft.booleanValue()) {
            return this.isObjectReuse ? this.leftSer.copy((BaseRow)element.getValue()) : (BaseRow)element.getValue();
        }
        return this.isObjectReuse ? this.rightSer.copy((BaseRow)element.getValue()) : (BaseRow)element.getValue();
    }
}

