package org.apache.flink.table.runtime.operator.join.batch;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.InputElementSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.operator.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.operator.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.util.RowIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.codehaus.commons.compiler.CompileException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator.class */
public abstract class HashJoinOperator extends AbstractStreamOperatorWithMetrics<BaseRow> implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
    private static final Logger LOG = LoggerFactory.getLogger(HashJoinOperator.class);
    private final long reservedMemorySize;
    private final long maxMemorySize;
    private final long perRequestMemorySize;
    private final boolean reverseJoinFunction;
    private final boolean[] filterNullKeys;
    final HashJoinType type;
    private final boolean tryDistinctBuildRow;
    private final int buildRowSize;
    private final long buildRowCount;
    private GeneratedJoinConditionFunction condFuncCode;
    private GeneratedProjection buildProjectionCode;
    private GeneratedProjection probeProjectionCode;
    protected transient Class<JoinConditionFunction> condFuncClass;
    protected transient Class<Projection<BaseRow, BinaryRow>> buildProjectionClass;
    protected transient Class<Projection<BaseRow, BinaryRow>> probeProjectionClass;
    private transient BinaryHashTable table;
    transient Collector<BaseRow> collector;
    private transient MemoryManager memManager;
    transient BaseRow buildSideNullRow;
    transient BaseRow probeSideNullRow;
    private transient JoinedRow joinedRow;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$AntiHashJoinOperator.class */
    public static class AntiHashJoinOperator extends HashJoinOperator {
        AntiHashJoinOperator(HashJoinParameter hashJoinParameter) {
            super(hashJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator
        public void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
            Preconditions.checkNotNull(baseRow);
            if (rowIterator.advanceNext()) {
                return;
            }
            this.collector.collect(baseRow);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$BuildLeftSemiOrAntiHashJoinOperator.class */
    public static class BuildLeftSemiOrAntiHashJoinOperator extends HashJoinOperator {
        BuildLeftSemiOrAntiHashJoinOperator(HashJoinParameter hashJoinParameter) {
            super(hashJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator
        public void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
            if (rowIterator.advanceNext()) {
                if (baseRow == null) {
                    this.collector.collect(rowIterator.getRow());
                    while (rowIterator.advanceNext()) {
                        this.collector.collect(rowIterator.getRow());
                    }
                    return;
                }
                do {
                } while (rowIterator.advanceNext());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$BuildOuterHashJoinOperator.class */
    public static class BuildOuterHashJoinOperator extends HashJoinOperator {
        BuildOuterHashJoinOperator(HashJoinParameter hashJoinParameter) {
            super(hashJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator
        public void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
            if (rowIterator.advanceNext()) {
                if (baseRow != null) {
                    innerJoin(rowIterator, baseRow);
                } else {
                    buildOuterJoin(rowIterator);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$FullOuterHashJoinOperator.class */
    public static class FullOuterHashJoinOperator extends HashJoinOperator {
        FullOuterHashJoinOperator(HashJoinParameter hashJoinParameter) {
            super(hashJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator
        public void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
            if (!rowIterator.advanceNext()) {
                if (baseRow != null) {
                    collect(this.buildSideNullRow, baseRow);
                }
            } else if (baseRow != null) {
                innerJoin(rowIterator, baseRow);
            } else {
                buildOuterJoin(rowIterator);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$HashJoinParameter.class */
    public static class HashJoinParameter {
        long reservedMemorySize;
        long maxMemorySize;
        long perRequestMemorySize;
        HashJoinType type;
        GeneratedJoinConditionFunction condFuncCode;
        boolean reverseJoinFunction;
        boolean[] filterNullKeys;
        GeneratedProjection buildProjectionCode;
        GeneratedProjection probeProjectionCode;
        boolean tryDistinctBuildRow;
        int buildRowSize;
        long buildRowCount;

        HashJoinParameter(long j, long j2, long j3, HashJoinType hashJoinType, GeneratedJoinConditionFunction generatedJoinConditionFunction, boolean z, boolean[] zArr, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, boolean z2, int i, long j4) {
            this.reservedMemorySize = j;
            this.maxMemorySize = j2;
            this.perRequestMemorySize = j3;
            this.type = hashJoinType;
            this.condFuncCode = generatedJoinConditionFunction;
            this.reverseJoinFunction = z;
            this.filterNullKeys = zArr;
            this.buildProjectionCode = generatedProjection;
            this.probeProjectionCode = generatedProjection2;
            this.tryDistinctBuildRow = z2;
            this.buildRowSize = i;
            this.buildRowCount = j4;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$InnerHashJoinOperator.class */
    public static class InnerHashJoinOperator extends HashJoinOperator {
        InnerHashJoinOperator(HashJoinParameter hashJoinParameter) {
            super(hashJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator
        public void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
            if (!rowIterator.advanceNext() || baseRow == null) {
                return;
            }
            innerJoin(rowIterator, baseRow);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$ProbeOuterHashJoinOperator.class */
    public static class ProbeOuterHashJoinOperator extends HashJoinOperator {
        ProbeOuterHashJoinOperator(HashJoinParameter hashJoinParameter) {
            super(hashJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator
        public void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
            if (rowIterator.advanceNext()) {
                if (baseRow != null) {
                    innerJoin(rowIterator, baseRow);
                }
            } else if (baseRow != null) {
                collect(this.buildSideNullRow, baseRow);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operator/join/batch/HashJoinOperator$SemiHashJoinOperator.class */
    public static class SemiHashJoinOperator extends HashJoinOperator {
        SemiHashJoinOperator(HashJoinParameter hashJoinParameter) {
            super(hashJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator
        public void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
            Preconditions.checkNotNull(baseRow);
            if (rowIterator.advanceNext()) {
                this.collector.collect(baseRow);
            }
        }
    }

    public HashJoinOperator(HashJoinParameter hashJoinParameter) {
        this.reservedMemorySize = hashJoinParameter.reservedMemorySize;
        this.maxMemorySize = hashJoinParameter.maxMemorySize;
        this.perRequestMemorySize = hashJoinParameter.perRequestMemorySize;
        this.type = hashJoinParameter.type;
        this.condFuncCode = hashJoinParameter.condFuncCode;
        this.reverseJoinFunction = hashJoinParameter.reverseJoinFunction;
        this.filterNullKeys = hashJoinParameter.filterNullKeys;
        this.buildProjectionCode = hashJoinParameter.buildProjectionCode;
        this.probeProjectionCode = hashJoinParameter.probeProjectionCode;
        this.tryDistinctBuildRow = hashJoinParameter.tryDistinctBuildRow;
        this.buildRowSize = hashJoinParameter.buildRowSize;
        this.buildRowCount = hashJoinParameter.buildRowCount;
    }

    @Override // org.apache.flink.table.runtime.operator.AbstractStreamOperatorWithMetrics
    public void open() throws Exception {
        super.open();
        cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
        this.memManager = getContainingTask().getEnvironment().getMemoryManager();
        IOManager iOManager = getContainingTask().getEnvironment().getIOManager();
        AbstractRowSerializer abstractRowSerializer = (AbstractRowSerializer) getOperatorContext().getTypeSerializerIn1();
        AbstractRowSerializer abstractRowSerializer2 = (AbstractRowSerializer) getOperatorContext().getTypeSerializerIn2();
        this.table = new BinaryHashTable(getContainingTask(), abstractRowSerializer, abstractRowSerializer2, this.buildProjectionClass.newInstance(), this.probeProjectionClass.newInstance(), this.memManager, this.reservedMemorySize, this.maxMemorySize, this.perRequestMemorySize, iOManager, this.buildRowSize, this.buildRowCount / getRuntimeContext().getNumberOfParallelSubtasks(), getContainingTask().getEnvironment().getTaskConfiguration().getBoolean("taskmanager.runtime.hashjoin-bloom-filters", false), this.type, this.condFuncClass.newInstance(), this.reverseJoinFunction, this.filterNullKeys, this.tryDistinctBuildRow);
        this.collector = new StreamRecordCollector(this.output);
        this.buildSideNullRow = new GenericRow(abstractRowSerializer.getNumFields());
        this.probeSideNullRow = new GenericRow(abstractRowSerializer2.getNumFields());
        this.joinedRow = new JoinedRow();
        getMetricGroup().gauge("memoryUsedSizeInBytes", new Gauge<Long>() { // from class: org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m5171getValue() {
                return Long.valueOf(HashJoinOperator.this.table.getUsedMemoryInBytes());
            }
        });
        getMetricGroup().gauge("numSpillFiles", new Gauge<Long>() { // from class: org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m5172getValue() {
                return Long.valueOf(HashJoinOperator.this.table.getNumSpillFiles());
            }
        });
        getMetricGroup().gauge("spillInBytes", new Gauge<Long>() { // from class: org.apache.flink.table.runtime.operator.join.batch.HashJoinOperator.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m5173getValue() {
                return Long.valueOf(HashJoinOperator.this.table.getSpillInBytes());
            }
        });
    }

    protected void cookGeneratedClasses(ClassLoader classLoader) throws CompileException {
        long currentTimeMillis = System.currentTimeMillis();
        this.condFuncClass = CodeGenUtils.compile(classLoader, this.condFuncCode.name(), this.condFuncCode.code());
        this.buildProjectionClass = CodeGenUtils.compile(classLoader, this.buildProjectionCode.name(), this.buildProjectionCode.code());
        this.probeProjectionClass = CodeGenUtils.compile(classLoader, this.probeProjectionCode.name(), this.probeProjectionCode.code());
        this.condFuncCode = null;
        this.buildProjectionCode = null;
        this.probeProjectionCode = null;
        LOG.info("Compiling generated codes, used time: " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
    }

    public InputElementSelection firstInputSelection() {
        return InputElementSelection.FIRST;
    }

    public InputElementSelection processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.table.putBuildRow((BaseRow) streamRecord.getValue());
        return InputElementSelection.FIRST;
    }

    public InputElementSelection processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        if (this.table.tryProbe((BaseRow) streamRecord.getValue())) {
            joinWithNextKey();
        }
        return InputElementSelection.SECOND;
    }

    public InputElementSelection endInput1() throws Exception {
        sendStageDoneEvent(0);
        LOG.info("Finish build phase.");
        this.table.endBuild();
        return InputElementSelection.SECOND;
    }

    public InputElementSelection endInput2() throws Exception {
        if (this.type.buildLeftSemiOrAnti()) {
            sendStageDoneEvent(1);
        }
        LOG.info("Finish probe phase.");
        while (this.table.nextMatching()) {
            joinWithNextKey();
        }
        LOG.info("Finish rebuild phase.");
        return InputElementSelection.NONE;
    }

    private void joinWithNextKey() throws Exception {
        join(this.table.getBuildSideIterator(), this.table.getCurrentProbeRow());
    }

    public abstract void join(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception;

    void innerJoin(RowIterator<BinaryRow> rowIterator, BaseRow baseRow) throws Exception {
        collect(rowIterator.getRow(), baseRow);
        while (rowIterator.advanceNext()) {
            collect(rowIterator.getRow(), baseRow);
        }
    }

    void buildOuterJoin(RowIterator<BinaryRow> rowIterator) throws Exception {
        collect(rowIterator.getRow(), this.probeSideNullRow);
        while (rowIterator.advanceNext()) {
            collect(rowIterator.getRow(), this.probeSideNullRow);
        }
    }

    void collect(BaseRow baseRow, BaseRow baseRow2) throws Exception {
        if (this.reverseJoinFunction) {
            this.collector.collect(this.joinedRow.replace(baseRow2, baseRow));
        } else {
            this.collector.collect(this.joinedRow.replace(baseRow, baseRow2));
        }
    }

    @Override // org.apache.flink.table.runtime.operator.AbstractStreamOperatorWithMetrics
    public void close() throws Exception {
        super.close();
        if (this.table != null) {
            this.table.close();
            this.table.free();
        }
    }

    public static HashJoinOperator newHashJoinOperator(long j, long j2, long j3, HashJoinType hashJoinType, GeneratedJoinConditionFunction generatedJoinConditionFunction, boolean z, boolean[] zArr, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, boolean z2, int i, long j4) {
        HashJoinParameter hashJoinParameter = new HashJoinParameter(j, j2, j3, hashJoinType, generatedJoinConditionFunction, z, zArr, generatedProjection, generatedProjection2, z2, i, j4);
        switch (hashJoinType) {
            case INNER:
                return new InnerHashJoinOperator(hashJoinParameter);
            case BUILD_OUTER:
                return new BuildOuterHashJoinOperator(hashJoinParameter);
            case PROBE_OUTER:
                return new ProbeOuterHashJoinOperator(hashJoinParameter);
            case FULL_OUTER:
                return new FullOuterHashJoinOperator(hashJoinParameter);
            case SEMI:
                return new SemiHashJoinOperator(hashJoinParameter);
            case ANTI:
                return new AntiHashJoinOperator(hashJoinParameter);
            case BUILD_LEFT_SEMI:
            case BUILD_LEFT_ANTI:
                return new BuildLeftSemiOrAntiHashJoinOperator(hashJoinParameter);
            default:
                throw new IllegalArgumentException("invalid: " + hashJoinType);
        }
    }
}
