package org.apache.flink.table.runtime.join.batch;

import java.io.IOException;
import java.util.BitSet;
import java.util.List;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BinaryRowUtil;
import org.apache.flink.table.plan.FlinkJoinRelType;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.codehaus.commons.compiler.CompileException;

/* loaded from: input_file:org/apache/flink/table/runtime/join/batch/MergeJoinOperator.class */
public class MergeJoinOperator extends AbstractStreamOperatorWithMetrics<BaseRow> implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
    private final long leftBufferMemory;
    private final long rightBufferMemory;
    private final FlinkJoinRelType type;
    private final GeneratedJoinConditionFunction condFuncCode;
    private final GeneratedProjection projectionCode1;
    private final GeneratedProjection projectionCode2;
    private final GeneratedSorter keyGSorter;
    private transient JoinConditionFunction condFunc;
    private transient RecordComparator keyComparator;
    private transient Collector<BaseRow> collector;
    private transient boolean isFinished1;
    private transient boolean isFinished2;
    private transient AbstractRowSerializer<BaseRow> serializer1;
    private transient AbstractRowSerializer<BaseRow> serializer2;
    private transient WrappedBuffer buffer1;
    private transient WrappedBuffer buffer2;
    private transient BaseRow leftNullRow;
    private transient BaseRow rightNullRow;
    private transient JoinedRow joinedRow;
    private transient boolean isFindStage;
    private transient boolean advance1;
    private transient boolean advance2;
    private transient BinaryRow mergeKey;
    private transient int mergeCount1;
    private transient int mergeCount2;
    private transient BitSet mergeBs1;
    private transient BitSet mergeBs2;
    private transient boolean leftIsBuild;
    private transient MemoryManager memManager;
    private transient IOManager ioManager;
    private final int[] nullFilterKeys;
    private final boolean nullSafe;
    private final boolean filterAllNulls;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/table/runtime/join/batch/MergeJoinOperator$CookedClasses.class */
    public static class CookedClasses {
        protected final Class<JoinConditionFunction> condFuncClass;
        protected final Class<RecordComparator> keyComparatorClass;
        protected final Class<Projection> projectionClass1;
        protected final Class<Projection> projectionClass2;

        protected CookedClasses(Class<JoinConditionFunction> cls, Class<RecordComparator> cls2, Class<Projection> cls3, Class<Projection> cls4) {
            this.condFuncClass = cls;
            this.keyComparatorClass = cls2;
            this.projectionClass1 = cls3;
            this.projectionClass2 = cls4;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/join/batch/MergeJoinOperator$WrappedBuffer.class */
    public class WrappedBuffer {
        private BaseRow current;
        private BinaryRow currentKey;
        private Projection<BaseRow, BinaryRow> projection;
        private boolean currentKeyShouldFilter;
        private BaseRow cache;
        private ResettableExternalBuffer externalBuffer;
        private ResettableExternalBuffer.BufferIterator externalIterator;

        private WrappedBuffer(List<MemorySegment> list, AbstractRowSerializer abstractRowSerializer, Projection<BaseRow, BinaryRow> projection) {
            this.current = null;
            this.currentKey = null;
            this.currentKeyShouldFilter = true;
            this.cache = null;
            this.projection = projection;
            this.externalBuffer = new ResettableExternalBuffer(MergeJoinOperator.this.memManager, MergeJoinOperator.this.ioManager, list, abstractRowSerializer);
            this.externalIterator = this.externalBuffer.newIterator();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(BaseRow baseRow) {
            Preconditions.checkState(this.cache == null, "Old cache must be materialized to buffer. This is a bug.");
            this.cache = baseRow;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void materializeCache() throws IOException {
            if (this.cache != null) {
                this.externalBuffer.add(this.cache);
                if (isCacheJustRead()) {
                    this.externalIterator.advanceNext();
                    this.current = this.externalIterator.getRow();
                }
                this.cache = null;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void discard() {
            if (isCacheJustRead()) {
                reset();
            }
        }

        private boolean isCacheJustRead() {
            return this.current == this.cache;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasNext() {
            return !(this.cache == null || this.current == this.cache) || this.externalIterator.hasNext();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public BaseRow nextRow() {
            if (this.externalIterator.hasNext()) {
                this.externalIterator.advanceNext();
                this.current = this.externalIterator.getRow();
            } else {
                this.current = this.cache;
            }
            if (this.current == null || MergeJoinOperator.isEndRow(this.current)) {
                this.currentKey = null;
                this.currentKeyShouldFilter = true;
            } else {
                this.currentKey = this.projection.apply(this.current);
                this.currentKeyShouldFilter = MergeJoinOperator.this.shouldFilterNull(this.currentKey);
            }
            return this.current;
        }

        private void reset() {
            this.current = null;
            this.currentKey = null;
            this.cache = null;
            clearBuffer();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            this.externalIterator.close();
            this.externalBuffer.close();
        }

        private void clearBuffer() {
            if (this.externalBuffer.size() > 0) {
                this.externalIterator.close();
                this.externalBuffer.reset();
                this.externalIterator = this.externalBuffer.newIterator();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isBuild() {
            return this.current == this.cache;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void enterBuildMode() {
            clearBuffer();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void leaveBuildMode() {
            if (this.externalIterator.hasNext()) {
                this.externalIterator.advanceNext();
            } else {
                clearBuffer();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getUsedMemoryInBytes() {
            return this.externalBuffer.getUsedMemoryInBytes();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumSpillFiles() {
            return this.externalBuffer.getNumSpillFiles();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getSpillInBytes() {
            return this.externalBuffer.getSpillInBytes();
        }
    }

    public MergeJoinOperator(long j, long j2, FlinkJoinRelType flinkJoinRelType, GeneratedJoinConditionFunction generatedJoinConditionFunction, GeneratedProjection generatedProjection, GeneratedProjection generatedProjection2, GeneratedSorter generatedSorter, boolean[] zArr) {
        if (flinkJoinRelType != FlinkJoinRelType.INNER && flinkJoinRelType != FlinkJoinRelType.LEFT && flinkJoinRelType != FlinkJoinRelType.RIGHT && flinkJoinRelType != FlinkJoinRelType.FULL) {
            throw new RuntimeException("Merge join operator only supports inner/left outer/right outer/full outer join currently.");
        }
        LOG.info("Initializing merge join operator...\nleftBufferMemory = " + j + ", rightBufferMemory = " + j2);
        this.leftBufferMemory = j;
        this.rightBufferMemory = j2;
        this.type = flinkJoinRelType;
        this.condFuncCode = generatedJoinConditionFunction;
        this.projectionCode1 = generatedProjection;
        this.projectionCode2 = generatedProjection2;
        this.keyGSorter = generatedSorter;
        this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(zArr);
        this.nullSafe = this.nullFilterKeys.length == 0;
        this.filterAllNulls = this.nullFilterKeys.length == zArr.length;
    }

    @Override // org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics
    public void open() throws Exception {
        super.open();
        this.isFinished1 = false;
        this.isFinished2 = false;
        this.collector = new StreamRecordCollector(this.output);
        this.serializer1 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
        this.serializer2 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn2(getUserCodeClassloader());
        this.leftNullRow = new GenericRow(this.serializer1.getNumFields());
        this.rightNullRow = new GenericRow(this.serializer2.getNumFields());
        this.joinedRow = new JoinedRow();
        CookedClasses cookGeneratedClasses = cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
        this.condFunc = cookGeneratedClasses.condFuncClass.newInstance();
        this.keyComparator = cookGeneratedClasses.keyComparatorClass.newInstance();
        this.keyComparator.init(this.keyGSorter.serializers(), this.keyGSorter.comparators());
        this.memManager = getContainingTask().getEnvironment().getMemoryManager();
        this.ioManager = getContainingTask().getEnvironment().getIOManager();
        Projection newInstance = cookGeneratedClasses.projectionClass1.newInstance();
        Projection newInstance2 = cookGeneratedClasses.projectionClass2.newInstance();
        this.buffer1 = new WrappedBuffer(this.memManager.allocatePages(getContainingTask(), (int) (this.leftBufferMemory / this.memManager.getPageSize())), this.serializer1, newInstance);
        this.buffer2 = new WrappedBuffer(this.memManager.allocatePages(getContainingTask(), (int) (this.rightBufferMemory / this.memManager.getPageSize())), this.serializer2, newInstance2);
        initGauge();
        initJoin();
    }

    protected CookedClasses cookGeneratedClasses(ClassLoader classLoader) throws CompileException {
        return new CookedClasses(CodeGenUtils.compile(classLoader, this.condFuncCode.name(), this.condFuncCode.code()), CodeGenUtils.compile(classLoader, this.keyGSorter.comparator().name(), this.keyGSorter.comparator().code()), CodeGenUtils.compile(classLoader, this.projectionCode1.name(), this.projectionCode1.code()), CodeGenUtils.compile(classLoader, this.projectionCode2.name(), this.projectionCode2.code()));
    }

    private void initJoin() {
        this.advance1 = true;
        this.advance2 = true;
        this.isFindStage = true;
        if (this.type.isLeftOuter()) {
            this.mergeBs1 = new BitSet();
        }
        if (this.type.isRightOuter()) {
            this.mergeBs2 = new BitSet();
        }
    }

    private void initGauge() {
        getMetricGroup().gauge("memoryUsedSizeInBytes", () -> {
            return Long.valueOf(this.buffer1.getUsedMemoryInBytes() + this.buffer2.getUsedMemoryInBytes());
        });
        getMetricGroup().gauge("numSpillFiles", () -> {
            return Integer.valueOf(this.buffer1.getNumSpillFiles() + this.buffer2.getNumSpillFiles());
        });
        getMetricGroup().gauge("spillInBytes", () -> {
            return Long.valueOf(this.buffer1.getSpillInBytes() + this.buffer2.getSpillInBytes());
        });
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.buffer1.add((BaseRow) streamRecord.getValue());
        runJoin();
        this.buffer1.materializeCache();
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.buffer2.add((BaseRow) streamRecord.getValue());
        runJoin();
        this.buffer2.materializeCache();
        return TwoInputSelection.ANY;
    }

    public void endInput1() throws Exception {
        this.buffer1.add(BinaryRowUtil.EMPTY_ROW);
        this.isFinished1 = true;
        if (isAllFinished()) {
            runJoin();
        }
    }

    public void endInput2() throws Exception {
        this.buffer2.add(BinaryRowUtil.EMPTY_ROW);
        this.isFinished2 = true;
        if (isAllFinished()) {
            runJoin();
        }
    }

    @Override // org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics
    public void close() throws Exception {
        super.close();
        if (this.buffer1 != null) {
            this.buffer1.close();
        }
        if (this.buffer2 != null) {
            this.buffer2.close();
        }
    }

    private boolean isAllFinished() {
        return this.isFinished1 && this.isFinished2;
    }

    private void runJoin() throws Exception {
        boolean z = true;
        while (z) {
            z = this.isFindStage ? runFindStep() : runMergeStep();
        }
    }

    private boolean runFindStep() {
        if (this.advance1 && !this.buffer1.hasNext()) {
            return false;
        }
        if (this.advance2 && !this.buffer2.hasNext()) {
            return false;
        }
        BaseRow nextRow = this.advance1 ? this.buffer1.nextRow() : this.buffer1.current;
        BaseRow nextRow2 = this.advance2 ? this.buffer2.nextRow() : this.buffer2.current;
        boolean isEndRow = isEndRow(nextRow);
        boolean isEndRow2 = isEndRow(nextRow2);
        if (isEndRow && isEndRow2) {
            return false;
        }
        int compare = isEndRow ? 1 : isEndRow2 ? -1 : this.buffer1.currentKeyShouldFilter ? -1 : this.buffer2.currentKeyShouldFilter ? 1 : this.keyComparator.compare((BaseRow) this.buffer1.currentKey, (BaseRow) this.buffer2.currentKey);
        if (compare < 0) {
            this.advance1 = true;
            this.advance2 = false;
            if (this.type.isLeftOuter()) {
                collect(nextRow, this.rightNullRow);
            }
            this.buffer1.discard();
            return true;
        }
        if (compare <= 0) {
            initMergeStage();
            return true;
        }
        this.advance1 = false;
        this.advance2 = true;
        if (this.type.isRightOuter()) {
            collect(this.leftNullRow, nextRow2);
        }
        this.buffer2.discard();
        return true;
    }

    private void initMergeStage() {
        this.isFindStage = false;
        boolean isBuild = this.buffer1.isBuild();
        this.leftIsBuild = isBuild;
        if (isBuild) {
            this.advance1 = true;
            this.advance2 = false;
            this.buffer1.enterBuildMode();
        } else {
            this.advance1 = false;
            this.advance2 = true;
            this.buffer2.enterBuildMode();
        }
        this.mergeKey = this.buffer1.currentKey.copy();
        this.mergeCount1 = 0;
        this.mergeCount2 = 0;
        if (this.mergeBs1 != null) {
            this.mergeBs1.clear();
        }
        if (this.mergeBs2 != null) {
            this.mergeBs2.clear();
        }
    }

    private boolean runMergeStep() throws Exception {
        if (this.advance1 && this.buffer1.hasNext()) {
            this.mergeCount1++;
            if (!this.leftIsBuild) {
                this.advance1 = joinCurrentProbeRow(this.buffer1, this.buffer2, this.mergeCount2);
                return true;
            }
            this.advance1 = checkNextRowIsSameKey(this.buffer1);
            if (this.advance1 || !this.leftIsBuild) {
                return true;
            }
            this.advance2 = true;
            return true;
        }
        if (!this.advance2 || !this.buffer2.hasNext()) {
            if (this.advance1 || this.advance2) {
                return false;
            }
            endMergeStage();
            return true;
        }
        this.mergeCount2++;
        if (this.leftIsBuild) {
            this.advance2 = joinCurrentProbeRow(this.buffer2, this.buffer1, this.mergeCount1);
            return true;
        }
        this.advance2 = checkNextRowIsSameKey(this.buffer2);
        if (this.advance2 || this.leftIsBuild) {
            return true;
        }
        this.advance1 = true;
        return true;
    }

    private boolean checkNextRowIsSameKey(WrappedBuffer wrappedBuffer) {
        if (isEndRow(wrappedBuffer.nextRow())) {
            return false;
        }
        return !wrappedBuffer.currentKeyShouldFilter && this.mergeKey.equals(wrappedBuffer.currentKey);
    }

    private boolean joinCurrentProbeRow(WrappedBuffer wrappedBuffer, WrappedBuffer wrappedBuffer2, int i) throws Exception {
        BaseRow baseRow = wrappedBuffer.current;
        boolean z = false;
        ResettableExternalBuffer.BufferIterator bufferIterator = wrappedBuffer2.externalIterator;
        bufferIterator.reset();
        for (int i2 = 0; i2 < i; i2++) {
            Preconditions.checkState(bufferIterator.advanceNext(), "There is no next row in build buffer. This is a bug.");
            BinaryRow row = bufferIterator.getRow();
            if (this.leftIsBuild) {
                if (this.condFunc.apply(row, baseRow)) {
                    z = true;
                    collect(row, baseRow);
                    if (this.mergeBs1 != null) {
                        this.mergeBs1.set(i2);
                    }
                }
            } else if (this.condFunc.apply(baseRow, row)) {
                z = true;
                collect(baseRow, row);
                if (this.mergeBs2 != null) {
                    this.mergeBs2.set(i2);
                }
            }
        }
        if (!z) {
            if (this.mergeBs1 != null && !this.leftIsBuild) {
                collect(baseRow, this.rightNullRow);
            } else if (this.mergeBs2 != null && this.leftIsBuild) {
                collect(this.leftNullRow, baseRow);
            }
        }
        return checkNextRowIsSameKey(wrappedBuffer);
    }

    private void endMergeStage() throws IOException {
        if (this.leftIsBuild && this.buffer1.externalIterator.rowInSpill(this.buffer1.externalIterator.getBeginRow())) {
            LOG.warn("(In merge join operator) Build side iterator is in spilled file, this may decrease performance.");
        } else if (!this.leftIsBuild && this.buffer2.externalIterator.rowInSpill(this.buffer2.externalIterator.getBeginRow())) {
            LOG.warn("(In merge join operator) Build side iterator is in spilled file, this may decrease performance.");
        }
        if (this.leftIsBuild) {
            if (this.mergeBs1 != null) {
                this.buffer1.externalIterator.reset();
                for (int i = 0; i < this.mergeCount1; i++) {
                    this.buffer1.externalIterator.advanceNext();
                    if (!this.mergeBs1.get(i)) {
                        collect(this.buffer1.externalIterator.getRow(), this.rightNullRow);
                    }
                }
            }
        } else if (this.mergeBs2 != null) {
            this.buffer2.externalIterator.reset();
            for (int i2 = 0; i2 < this.mergeCount2; i2++) {
                this.buffer2.externalIterator.advanceNext();
                if (!this.mergeBs2.get(i2)) {
                    collect(this.leftNullRow, this.buffer2.externalIterator.getRow());
                }
            }
        }
        this.isFindStage = true;
        this.advance1 = false;
        this.advance2 = false;
        if (this.leftIsBuild) {
            this.buffer1.leaveBuildMode();
        } else {
            this.buffer2.leaveBuildMode();
        }
    }

    private void collect(BaseRow baseRow, BaseRow baseRow2) {
        this.collector.collect(this.joinedRow.replace(baseRow, baseRow2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isEndRow(BaseRow baseRow) {
        return baseRow.getArity() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldFilterNull(BinaryRow binaryRow) {
        return NullAwareJoinHelper.shouldFilter(this.nullSafe, this.filterAllNulls, this.nullFilterKeys, binaryRow);
    }
}
