package org.apache.flink.table.runtime.join.batch;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.plan.FlinkJoinRelType;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableListBuffer;
import org.apache.flink.table.runtime.util.ResettableRowBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.codehaus.commons.compiler.CompileException;

/* loaded from: input_file:org/apache/flink/table/runtime/join/batch/SortMergeJoinOperator.class */
public class SortMergeJoinOperator extends AbstractStreamOperatorWithMetrics<BaseRow> implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
    private final long reservedSortMemory1;
    private final long maxSortMemory1;
    private final long reservedSortMemory2;
    private final long maxSortMemory2;
    private final long perRequestMemory;
    private final long externalBufferMemory;
    private final FlinkJoinRelType type;
    private final boolean leftIsSmaller;
    private final int bufferThreshold;
    private GeneratedJoinConditionFunction condFuncCode;
    private GeneratedProjection projectionCode1;
    private GeneratedProjection projectionCode2;
    private GeneratedSorter gSorter1;
    private GeneratedSorter gSorter2;
    private GeneratedSorter keyGSorter;
    private final boolean[] filterNulls;
    private transient CookedClasses classes;
    private transient Configuration conf;
    private transient MemoryManager memManager;
    private transient IOManager ioManager;
    private transient TypeSerializer<BaseRow> inputSerializer1;
    private transient TypeSerializer<BaseRow> inputSerializer2;
    private transient BinaryRowSerializer serializer1;
    private transient BinaryRowSerializer serializer2;
    private transient BinaryExternalSorter sorter1;
    private transient BinaryExternalSorter sorter2;
    private transient SortMergeJoinIterator joinIterator1;
    private transient SortMergeJoinIterator joinIterator2;
    private transient SortMergeFullOuterJoinIterator fullOuterJoinIterator;
    private transient Collector<BaseRow> collector;
    private transient boolean[] isFinished;
    private transient JoinConditionFunction condFunc;
    private transient RecordComparator keyComparator;
    private transient BaseRow leftNullRow;
    private transient BaseRow rightNullRow;
    private transient SortMergeJoinHelper helper;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/table/runtime/join/batch/SortMergeJoinOperator$CookedClasses.class */
    public static class CookedClasses {
        protected final Class<JoinConditionFunction> condFuncClass;
        protected final Class<RecordComparator> keyComparatorClass;
        protected final Class<Projection> projectionClass1;
        protected final Class<Projection> projectionClass2;
        protected final Class<NormalizedKeyComputer> computerClass1;
        protected final Class<NormalizedKeyComputer> computerClass2;
        protected final Class<RecordComparator> comparatorClass1;
        protected final Class<RecordComparator> comparatorClass2;

        protected CookedClasses(Class<JoinConditionFunction> cls, Class<RecordComparator> cls2, Class<Projection> cls3, Class<Projection> cls4, Class<NormalizedKeyComputer> cls5, Class<NormalizedKeyComputer> cls6, Class<RecordComparator> cls7, Class<RecordComparator> cls8) {
            this.condFuncClass = cls;
            this.keyComparatorClass = cls2;
            this.projectionClass1 = cls3;
            this.projectionClass2 = cls4;
            this.computerClass1 = cls5;
            this.computerClass2 = cls6;
            this.comparatorClass1 = cls7;
            this.comparatorClass2 = cls8;
        }
    }

    public SortMergeJoinOperator(long j, long j2, long j3, long j4, FlinkJoinRelType flinkJoinRelType, boolean z, GeneratedJoinConditionFunction generatedJoinConditionFunction, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, GeneratedSorter generatedSorter, GeneratedSorter generatedSorter2, GeneratedSorter generatedSorter3, boolean[] zArr) {
        this(j, j2, j, j2, j3, j4, flinkJoinRelType, z, generatedJoinConditionFunction, generatedProjection, generatedProjection2, generatedSorter, generatedSorter2, generatedSorter3, zArr, 4096);
    }

    public SortMergeJoinOperator(long j, long j2, long j3, long j4, long j5, long j6, FlinkJoinRelType flinkJoinRelType, boolean z, GeneratedJoinConditionFunction generatedJoinConditionFunction, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, GeneratedSorter generatedSorter, GeneratedSorter generatedSorter2, GeneratedSorter generatedSorter3, boolean[] zArr, int i) {
        this.reservedSortMemory1 = j;
        this.maxSortMemory1 = j2;
        this.reservedSortMemory2 = j3;
        this.maxSortMemory2 = j4;
        this.perRequestMemory = j5;
        this.externalBufferMemory = j6;
        this.type = flinkJoinRelType;
        this.leftIsSmaller = z;
        this.condFuncCode = generatedJoinConditionFunction;
        this.projectionCode1 = generatedProjection;
        this.projectionCode2 = generatedProjection2;
        this.gSorter1 = (GeneratedSorter) Preconditions.checkNotNull(generatedSorter);
        this.gSorter2 = (GeneratedSorter) Preconditions.checkNotNull(generatedSorter2);
        this.keyGSorter = (GeneratedSorter) Preconditions.checkNotNull(generatedSorter3);
        this.filterNulls = zArr;
        this.bufferThreshold = i;
    }

    @Override // org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics
    public void open() throws Exception {
        super.open();
        this.conf = getSqlConf();
        this.classes = cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
        this.isFinished = new boolean[2];
        this.isFinished[0] = false;
        this.isFinished[1] = false;
        this.collector = new StreamRecordCollector(this.output);
        this.inputSerializer1 = getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
        this.serializer1 = new BinaryRowSerializer(((AbstractRowSerializer) this.inputSerializer1).getTypes());
        this.inputSerializer2 = getOperatorConfig().getTypeSerializerIn2(getUserCodeClassloader());
        this.serializer2 = new BinaryRowSerializer(((AbstractRowSerializer) this.inputSerializer2).getTypes());
        this.memManager = getContainingTask().getEnvironment().getMemoryManager();
        this.ioManager = getContainingTask().getEnvironment().getIOManager();
        initSorter();
        initKeyComparator();
        this.condFunc = this.classes.condFuncClass.newInstance();
        this.leftNullRow = new GenericRow(this.serializer1.getNumFields());
        this.rightNullRow = new GenericRow(this.serializer2.getNumFields());
        this.helper = new SortMergeJoinHelper(this.collector, this.condFunc, this.leftNullRow, this.rightNullRow, new JoinedRow());
        this.condFuncCode = null;
        this.keyGSorter = null;
        this.projectionCode1 = null;
        this.projectionCode2 = null;
        this.gSorter1 = null;
        this.gSorter2 = null;
        getMetricGroup().gauge("memoryUsedSizeInBytes", () -> {
            return Long.valueOf(this.sorter1.getUsedMemoryInBytes() + this.sorter2.getUsedMemoryInBytes());
        });
        getMetricGroup().gauge("numSpillFiles", () -> {
            return Long.valueOf(this.sorter1.getNumSpillFiles() + this.sorter2.getNumSpillFiles());
        });
        getMetricGroup().gauge("spillInBytes", () -> {
            return Long.valueOf(this.sorter1.getSpillInBytes() + this.sorter2.getSpillInBytes());
        });
    }

    private void initSorter() throws Exception {
        NormalizedKeyComputer newInstance = this.classes.computerClass1.newInstance();
        RecordComparator newInstance2 = this.classes.comparatorClass1.newInstance();
        newInstance.init(this.gSorter1.serializers(), this.gSorter1.comparators());
        newInstance2.init(this.gSorter1.serializers(), this.gSorter1.comparators());
        this.sorter1 = new BinaryExternalSorter(getContainingTask(), this.memManager, this.reservedSortMemory1, this.maxSortMemory1, this.perRequestMemory, this.ioManager, this.inputSerializer1, this.serializer1, newInstance, newInstance2, this.conf);
        this.sorter1.startThreads();
        NormalizedKeyComputer newInstance3 = this.classes.computerClass2.newInstance();
        RecordComparator newInstance4 = this.classes.comparatorClass2.newInstance();
        newInstance3.init(this.gSorter2.serializers(), this.gSorter2.comparators());
        newInstance4.init(this.gSorter2.serializers(), this.gSorter2.comparators());
        this.sorter2 = new BinaryExternalSorter(getContainingTask(), this.memManager, this.reservedSortMemory2, this.maxSortMemory2, this.perRequestMemory, this.ioManager, this.inputSerializer2, this.serializer2, newInstance3, newInstance4, this.conf);
        this.sorter2.startThreads();
    }

    private void initKeyComparator() throws Exception {
        this.keyComparator = this.classes.keyComparatorClass.newInstance();
        this.keyComparator.init(this.keyGSorter.serializers(), this.keyGSorter.comparators());
    }

    protected CookedClasses cookGeneratedClasses(ClassLoader classLoader) throws CompileException {
        return new CookedClasses(CodeGenUtils.compile(classLoader, this.condFuncCode.name(), this.condFuncCode.code()), CodeGenUtils.compile(classLoader, this.keyGSorter.comparator().name(), this.keyGSorter.comparator().code()), CodeGenUtils.compile(classLoader, this.projectionCode1.name(), this.projectionCode1.code()), CodeGenUtils.compile(classLoader, this.projectionCode2.name(), this.projectionCode2.code()), CodeGenUtils.compile(classLoader, this.gSorter1.computer().name(), this.gSorter1.computer().code()), CodeGenUtils.compile(classLoader, this.gSorter2.computer().name(), this.gSorter2.computer().code()), CodeGenUtils.compile(classLoader, this.gSorter1.comparator().name(), this.gSorter1.comparator().code()), CodeGenUtils.compile(classLoader, this.gSorter2.comparator().name(), this.gSorter2.comparator().code()));
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.sorter1.write((BaseRow) streamRecord.getValue());
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.sorter2.write((BaseRow) streamRecord.getValue());
        return TwoInputSelection.ANY;
    }

    public void endInput1() throws Exception {
        this.isFinished[0] = true;
        if (isAllFinished()) {
            doSortMergeJoin();
        }
    }

    public void endInput2() throws Exception {
        this.isFinished[1] = true;
        if (isAllFinished()) {
            doSortMergeJoin();
        }
    }

    private void doSortMergeJoin() throws Exception {
        ResettableRowBuffer.ResettableIterator<BaseRow> newIterator2;
        Projection newInstance = this.classes.projectionClass1.newInstance();
        Projection newInstance2 = this.classes.projectionClass2.newInstance();
        MutableObjectIterator<BinaryRow> iterator = this.sorter1.getIterator();
        MutableObjectIterator<BinaryRow> iterator2 = this.sorter2.getIterator();
        if (this.type.equals(FlinkJoinRelType.INNER)) {
            if (this.leftIsSmaller) {
                this.joinIterator1 = new SortMergeInnerJoinIterator(this.serializer2, this.serializer1, newInstance2, newInstance, this.keyComparator, iterator2, iterator, newBuffer(this.serializer1), this.filterNulls);
                this.helper.innerJoin((SortMergeInnerJoinIterator) this.joinIterator1, true);
                return;
            } else {
                this.joinIterator2 = new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, newInstance, newInstance2, this.keyComparator, iterator, iterator2, newBuffer(this.serializer2), this.filterNulls);
                this.helper.innerJoin((SortMergeInnerJoinIterator) this.joinIterator2, false);
                return;
            }
        }
        if (this.type.equals(FlinkJoinRelType.LEFT)) {
            this.joinIterator2 = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, newInstance, newInstance2, this.keyComparator, iterator, iterator2, newBuffer(this.serializer2), this.filterNulls);
            this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator) this.joinIterator2, false, this.rightNullRow);
            return;
        }
        if (this.type.equals(FlinkJoinRelType.RIGHT)) {
            this.joinIterator1 = new SortMergeOneSideOuterJoinIterator(this.serializer2, this.serializer1, newInstance2, newInstance, this.keyComparator, iterator2, iterator, newBuffer(this.serializer1), this.filterNulls);
            this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator) this.joinIterator1, true, this.leftNullRow);
            return;
        }
        if (this.type.equals(FlinkJoinRelType.FULL)) {
            this.fullOuterJoinIterator = new SortMergeFullOuterJoinIterator(this.serializer1, this.serializer2, newInstance, newInstance2, this.keyComparator, iterator, iterator2, newBuffer(this.serializer1), newBuffer(this.serializer2), this.filterNulls);
            this.helper.fullOuterJoin(this.fullOuterJoinIterator);
            return;
        }
        if (this.type.equals(FlinkJoinRelType.SEMI)) {
            this.joinIterator2 = new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, newInstance, newInstance2, this.keyComparator, iterator, iterator2, newBuffer(this.serializer2), this.filterNulls);
            while (((SortMergeInnerJoinIterator) this.joinIterator2).nextInnerJoin()) {
                BaseRow probeRow = this.joinIterator2.getProbeRow();
                boolean z = false;
                newIterator2 = this.joinIterator2.getMatchBuffer().newIterator2();
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            if (!newIterator2.advanceNext()) {
                                break;
                            }
                            if (this.condFunc.apply(probeRow, newIterator2.getRow())) {
                                z = true;
                                break;
                            }
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } finally {
                    }
                }
                if (newIterator2 != null) {
                    if (0 != 0) {
                        try {
                            newIterator2.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        newIterator2.close();
                    }
                }
                if (z) {
                    this.collector.collect(probeRow);
                }
            }
            return;
        }
        if (!this.type.equals(FlinkJoinRelType.ANTI)) {
            throw new RuntimeException("Not support yet!");
        }
        this.joinIterator2 = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, newInstance, newInstance2, this.keyComparator, iterator, iterator2, newBuffer(this.serializer2), this.filterNulls);
        while (((SortMergeOneSideOuterJoinIterator) this.joinIterator2).nextOuterJoin()) {
            BaseRow probeRow2 = this.joinIterator2.getProbeRow();
            ResettableListBuffer matchBuffer = this.joinIterator2.getMatchBuffer();
            boolean z2 = false;
            if (matchBuffer != null) {
                newIterator2 = matchBuffer.newIterator2();
                Throwable th4 = null;
                while (true) {
                    try {
                        try {
                            if (!newIterator2.advanceNext()) {
                                break;
                            }
                            if (this.condFunc.apply(probeRow2, newIterator2.getRow())) {
                                z2 = true;
                                break;
                            }
                        } catch (Throwable th5) {
                            th4 = th5;
                            throw th5;
                        }
                    } finally {
                    }
                }
                if (newIterator2 != null) {
                    if (0 != 0) {
                        try {
                            newIterator2.close();
                        } catch (Throwable th6) {
                            th4.addSuppressed(th6);
                        }
                    } else {
                        newIterator2.close();
                    }
                }
            }
            if (!z2) {
                this.collector.collect(probeRow2);
            }
        }
    }

    private ResettableListBuffer newBuffer(BinaryRowSerializer binaryRowSerializer) throws MemoryAllocationException {
        return new ResettableListBuffer(this.memManager, this.ioManager, this.memManager.allocatePages(getContainingTask(), (int) (this.externalBufferMemory / this.memManager.getPageSize())), binaryRowSerializer, this.bufferThreshold);
    }

    private boolean isAllFinished() {
        return this.isFinished[0] && this.isFinished[1];
    }

    @Override // org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics
    public void close() throws Exception {
        super.close();
        if (this.sorter1 != null) {
            this.sorter1.close();
        }
        if (this.sorter2 != null) {
            this.sorter2.close();
        }
        if (this.joinIterator1 != null) {
            this.joinIterator1.close();
        }
        if (this.joinIterator2 != null) {
            this.joinIterator2.close();
        }
        if (this.fullOuterJoinIterator != null) {
            this.fullOuterJoinIterator.close();
        }
    }
}
