package org.apache.flink.table.runtime.join.batch;

import java.io.IOException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.plan.FlinkJoinRelType;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.runtime.util.ResettableListBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.codehaus.commons.compiler.CompileException;

/* loaded from: input_file:org/apache/flink/table/runtime/join/batch/OneSideSortMergeJoinOperator.class */
public class OneSideSortMergeJoinOperator extends AbstractStreamOperatorWithMetrics<BaseRow> implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
    private final long reservedSortMemory;
    private final long maxSortMemory;
    private final long perRequestMemory;
    private final long probeBufferMemory;
    private final long joinBufferMemory;
    private final FlinkJoinRelType type;
    private final boolean leftNeedsSort;
    private final boolean[] filterNulls;
    private GeneratedJoinConditionFunction condFuncCode;
    private final GeneratedProjection projectionCode1;
    private final GeneratedProjection projectionCode2;
    private final GeneratedSorter gSorter;
    private final GeneratedSorter keyGSorter;
    private final int bufferThreshold;
    private transient CookedClasses classes;
    private transient MemoryManager memManager;
    private transient IOManager ioManager;
    private transient AbstractRowSerializer inputSerializer1;
    private transient AbstractRowSerializer inputSerializer2;
    private transient BinaryRowSerializer serializer1;
    private transient BinaryRowSerializer serializer2;
    private transient BinaryExternalSorter sorter;
    private transient BaseRow probeRow;
    private transient ResettableExternalBuffer probeBuffer;
    private transient ProbeIterator probeIter;
    private transient SortMergeJoinIterator joinIterator;
    private transient BaseRow leftNullRow;
    private transient BaseRow rightNullRow;
    private transient boolean isFinished1;
    private transient boolean isFinished2;
    private transient SortMergeJoinHelper helper;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/table/runtime/join/batch/OneSideSortMergeJoinOperator$CookedClasses.class */
    public static class CookedClasses {
        protected final Class<JoinConditionFunction> condFuncClass;
        protected final Class<RecordComparator> keyComparatorClass;
        protected final Class<Projection> projectionClass1;
        protected final Class<Projection> projectionClass2;
        protected final Class<NormalizedKeyComputer> computerClass;
        protected final Class<RecordComparator> comparatorClass;

        protected CookedClasses(Class<JoinConditionFunction> cls, Class<RecordComparator> cls2, Class<Projection> cls3, Class<Projection> cls4, Class<NormalizedKeyComputer> cls5, Class<RecordComparator> cls6) {
            this.condFuncClass = cls;
            this.keyComparatorClass = cls2;
            this.projectionClass1 = cls3;
            this.projectionClass2 = cls4;
            this.computerClass = cls5;
            this.comparatorClass = cls6;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/join/batch/OneSideSortMergeJoinOperator$ProbeIterator.class */
    public class ProbeIterator implements MutableObjectIterator<BaseRow> {
        private ResettableExternalBuffer.BufferIterator bufferIterator;

        /* JADX WARN: Type inference failed for: r1v3, types: [org.apache.flink.table.runtime.util.ResettableExternalBuffer$BufferIterator] */
        private ProbeIterator() {
            this.bufferIterator = OneSideSortMergeJoinOperator.this.probeBuffer.newIterator2();
        }

        @Override // org.apache.flink.util.MutableObjectIterator
        public BaseRow next(BaseRow baseRow) throws IOException {
            return next();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.util.MutableObjectIterator
        public BaseRow next() throws IOException {
            if (this.bufferIterator.hasNext()) {
                this.bufferIterator.advanceNext();
                return this.bufferIterator.getRow();
            }
            if (OneSideSortMergeJoinOperator.this.probeRow == null) {
                return null;
            }
            BaseRow baseRow = OneSideSortMergeJoinOperator.this.probeRow;
            OneSideSortMergeJoinOperator.this.probeRow = null;
            return baseRow;
        }

        public void close() {
            this.bufferIterator.close();
        }
    }

    public OneSideSortMergeJoinOperator(long j, long j2, long j3, long j4, long j5, FlinkJoinRelType flinkJoinRelType, boolean z, GeneratedJoinConditionFunction generatedJoinConditionFunction, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, GeneratedSorter generatedSorter, GeneratedSorter generatedSorter2, boolean[] zArr) {
        this(j, j2, j3, j4, j5, flinkJoinRelType, z, generatedJoinConditionFunction, generatedProjection, generatedProjection2, generatedSorter, generatedSorter2, zArr, 4096);
    }

    public OneSideSortMergeJoinOperator(long j, long j2, long j3, long j4, long j5, FlinkJoinRelType flinkJoinRelType, boolean z, GeneratedJoinConditionFunction generatedJoinConditionFunction, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, GeneratedSorter generatedSorter, GeneratedSorter generatedSorter2, boolean[] zArr, int i) {
        if (flinkJoinRelType != FlinkJoinRelType.INNER && flinkJoinRelType != FlinkJoinRelType.LEFT && flinkJoinRelType != FlinkJoinRelType.RIGHT) {
            throw new RuntimeException("One side sort merge join operator only supports inner/left outer/right outer join currently.");
        }
        LOG.info("Initializing one side sort merge join operator...\nleftNeedsSort = " + z + ", reservedSortMemory = " + j + ", preferredSortMemory = " + j2 + ", perRequestMemory = " + j3 + ", probeBufferMemory = " + j4 + ", joinBufferMemory = " + j5);
        this.reservedSortMemory = j;
        this.maxSortMemory = j2;
        this.perRequestMemory = j3;
        this.probeBufferMemory = j4;
        this.joinBufferMemory = j5;
        this.type = flinkJoinRelType;
        this.leftNeedsSort = z;
        this.filterNulls = zArr;
        this.condFuncCode = generatedJoinConditionFunction;
        this.projectionCode1 = generatedProjection;
        this.projectionCode2 = generatedProjection2;
        this.gSorter = generatedSorter;
        this.keyGSorter = generatedSorter2;
        this.bufferThreshold = i;
    }

    @Override // org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics
    public void open() throws Exception {
        super.open();
        this.isFinished1 = false;
        this.isFinished2 = false;
        this.classes = cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
        StreamRecordCollector streamRecordCollector = new StreamRecordCollector(this.output);
        this.inputSerializer1 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
        this.serializer1 = new BinaryRowSerializer(this.inputSerializer1.getTypes());
        this.inputSerializer2 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn2(getUserCodeClassloader());
        this.serializer2 = new BinaryRowSerializer(this.inputSerializer2.getTypes());
        this.memManager = getContainingTask().getEnvironment().getMemoryManager();
        this.ioManager = getContainingTask().getEnvironment().getIOManager();
        JoinConditionFunction newInstance = this.classes.condFuncClass.newInstance();
        this.leftNullRow = new GenericRow(this.serializer1.getNumFields());
        this.rightNullRow = new GenericRow(this.serializer2.getNumFields());
        this.helper = new SortMergeJoinHelper(streamRecordCollector, newInstance, this.leftNullRow, this.rightNullRow, new JoinedRow());
        initSorter();
        this.probeBuffer = newBuffer(this.probeBufferMemory, this.leftNeedsSort ? this.inputSerializer2 : this.inputSerializer1);
        initGauge();
    }

    protected CookedClasses cookGeneratedClasses(ClassLoader classLoader) throws CompileException {
        return new CookedClasses(CodeGenUtils.compile(classLoader, this.condFuncCode.name(), this.condFuncCode.code()), CodeGenUtils.compile(classLoader, this.keyGSorter.comparator().name(), this.keyGSorter.comparator().code()), CodeGenUtils.compile(classLoader, this.projectionCode1.name(), this.projectionCode1.code()), CodeGenUtils.compile(classLoader, this.projectionCode2.name(), this.projectionCode2.code()), CodeGenUtils.compile(classLoader, this.gSorter.computer().name(), this.gSorter.computer().code()), CodeGenUtils.compile(classLoader, this.gSorter.comparator().name(), this.gSorter.comparator().code()));
    }

    private void initGauge() {
        getMetricGroup().gauge("memoryUsedSizeInBytes", () -> {
            return Long.valueOf(this.sorter.getUsedMemoryInBytes() + this.probeBuffer.getUsedMemoryInBytes());
        });
        getMetricGroup().gauge("numSpillFiles", () -> {
            return Long.valueOf(this.sorter.getNumSpillFiles() + this.probeBuffer.getNumSpillFiles());
        });
        getMetricGroup().gauge("spillInBytes", () -> {
            return Long.valueOf(this.sorter.getSpillInBytes() + this.probeBuffer.getSpillInBytes());
        });
    }

    private void initSorter() throws Exception {
        AbstractRowSerializer abstractRowSerializer;
        BinaryRowSerializer binaryRowSerializer;
        NormalizedKeyComputer newInstance = this.classes.computerClass.newInstance();
        newInstance.init(this.gSorter.serializers(), this.gSorter.comparators());
        RecordComparator newInstance2 = this.classes.comparatorClass.newInstance();
        newInstance2.init(this.gSorter.serializers(), this.gSorter.comparators());
        if (this.leftNeedsSort) {
            abstractRowSerializer = this.inputSerializer1;
            binaryRowSerializer = this.serializer1;
        } else {
            abstractRowSerializer = this.inputSerializer2;
            binaryRowSerializer = this.serializer2;
        }
        this.sorter = new BinaryExternalSorter(getContainingTask(), this.memManager, this.reservedSortMemory, this.maxSortMemory, this.perRequestMemory, this.ioManager, abstractRowSerializer, binaryRowSerializer, newInstance, newInstance2, getSqlConf());
        this.sorter.startThreads();
    }

    private void initJoinIterator() throws Exception {
        Projection newInstance = this.classes.projectionClass1.newInstance();
        Projection newInstance2 = this.classes.projectionClass2.newInstance();
        this.probeIter = new ProbeIterator();
        MutableObjectIterator<BinaryRow> iterator = this.sorter.getIterator();
        RecordComparator newInstance3 = this.classes.keyComparatorClass.newInstance();
        newInstance3.init(this.keyGSorter.serializers(), this.keyGSorter.comparators());
        if (this.type == FlinkJoinRelType.INNER) {
            if (this.leftNeedsSort) {
                this.joinIterator = new SortMergeInnerJoinIterator(this.serializer2, this.serializer1, newInstance2, newInstance, newInstance3, this.probeIter, iterator, newListBuffer(this.joinBufferMemory, this.serializer1), this.filterNulls);
                return;
            } else {
                this.joinIterator = new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, newInstance, newInstance2, newInstance3, this.probeIter, iterator, newListBuffer(this.joinBufferMemory, this.serializer2), this.filterNulls);
                return;
            }
        }
        if (this.type == FlinkJoinRelType.LEFT) {
            this.joinIterator = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, newInstance, newInstance2, newInstance3, this.probeIter, iterator, newListBuffer(this.joinBufferMemory, this.serializer2), this.filterNulls);
        } else {
            if (this.type != FlinkJoinRelType.RIGHT) {
                throw new RuntimeException("Not support type: " + this.type);
            }
            this.joinIterator = new SortMergeOneSideOuterJoinIterator(this.serializer2, this.serializer1, newInstance2, newInstance, newInstance3, this.probeIter, iterator, newListBuffer(this.joinBufferMemory, this.serializer1), this.filterNulls);
        }
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        if (this.leftNeedsSort) {
            this.sorter.write((BaseRow) streamRecord.getValue());
        } else {
            BaseRow baseRow = (BaseRow) streamRecord.getValue();
            if (this.isFinished2) {
                this.probeRow = baseRow;
                runJoin();
            } else {
                this.probeBuffer.add(baseRow);
            }
        }
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        if (this.leftNeedsSort) {
            BaseRow baseRow = (BaseRow) streamRecord.getValue();
            if (this.isFinished1) {
                this.probeRow = baseRow;
                runJoin();
            } else {
                this.probeBuffer.add(baseRow);
            }
        } else {
            this.sorter.write((BaseRow) streamRecord.getValue());
        }
        return TwoInputSelection.ANY;
    }

    public void endInput1() throws Exception {
        this.isFinished1 = true;
        if (this.leftNeedsSort) {
            initJoinIterator();
            runJoin();
        }
    }

    public void endInput2() throws Exception {
        this.isFinished2 = true;
        if (this.leftNeedsSort) {
            return;
        }
        initJoinIterator();
        runJoin();
    }

    @Override // org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics
    public void close() throws Exception {
        super.close();
        if (this.probeIter != null) {
            this.probeIter.close();
        }
        if (this.probeBuffer != null) {
            this.probeBuffer.close();
        }
        if (this.sorter != null) {
            this.sorter.close();
        }
        if (this.joinIterator != null) {
            this.joinIterator.close();
        }
    }

    private void runJoin() throws Exception {
        if (this.type == FlinkJoinRelType.INNER) {
            this.helper.innerJoin((SortMergeInnerJoinIterator) this.joinIterator, this.leftNeedsSort);
            return;
        }
        if (this.type == FlinkJoinRelType.LEFT) {
            if (this.leftNeedsSort) {
                throw new RuntimeException("Not support yet!");
            }
            this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator) this.joinIterator, false, this.rightNullRow);
        } else if (this.type == FlinkJoinRelType.RIGHT) {
            if (!this.leftNeedsSort) {
                throw new RuntimeException("Not support yet!");
            }
            this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator) this.joinIterator, true, this.leftNullRow);
        }
    }

    private ResettableListBuffer newListBuffer(long j, AbstractRowSerializer abstractRowSerializer) throws MemoryAllocationException {
        return new ResettableListBuffer(this.memManager, this.ioManager, this.memManager.allocatePages(getContainingTask(), (int) (j / this.memManager.getPageSize())), abstractRowSerializer, this.bufferThreshold);
    }

    private ResettableExternalBuffer newBuffer(long j, AbstractRowSerializer abstractRowSerializer) throws MemoryAllocationException {
        return new ResettableExternalBuffer(this.memManager, this.ioManager, this.memManager.allocatePages(getContainingTask(), (int) (j / this.memManager.getPageSize())), abstractRowSerializer);
    }
}
