/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.join.batch;

import java.io.IOException;
import java.util.List;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.plan.FlinkJoinRelType;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.join.batch.SortMergeInnerJoinIterator;
import org.apache.flink.table.runtime.join.batch.SortMergeJoinHelper;
import org.apache.flink.table.runtime.join.batch.SortMergeJoinIterator;
import org.apache.flink.table.runtime.join.batch.SortMergeOneSideOuterJoinIterator;
import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.runtime.util.ResettableListBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.codehaus.commons.compiler.CompileException;

public class OneSideSortMergeJoinOperator
extends AbstractStreamOperatorWithMetrics<BaseRow>
implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
    private final long reservedSortMemory;
    private final long maxSortMemory;
    private final long perRequestMemory;
    private final long probeBufferMemory;
    private final long joinBufferMemory;
    private final FlinkJoinRelType type;
    private final boolean leftNeedsSort;
    private final boolean[] filterNulls;
    private GeneratedJoinConditionFunction condFuncCode;
    private final GeneratedProjection projectionCode1;
    private final GeneratedProjection projectionCode2;
    private final GeneratedSorter gSorter;
    private final GeneratedSorter keyGSorter;
    private final int bufferThreshold;
    private transient CookedClasses classes;
    private transient MemoryManager memManager;
    private transient IOManager ioManager;
    private transient AbstractRowSerializer inputSerializer1;
    private transient AbstractRowSerializer inputSerializer2;
    private transient BinaryRowSerializer serializer1;
    private transient BinaryRowSerializer serializer2;
    private transient BinaryExternalSorter sorter;
    private transient BaseRow probeRow;
    private transient ResettableExternalBuffer probeBuffer;
    private transient ProbeIterator probeIter;
    private transient SortMergeJoinIterator joinIterator;
    private transient BaseRow leftNullRow;
    private transient BaseRow rightNullRow;
    private transient boolean isFinished1;
    private transient boolean isFinished2;
    private transient SortMergeJoinHelper helper;

    public OneSideSortMergeJoinOperator(long reservedSortMemory, long maxSortMemory, long perRequestMemory, long probeBufferMemory, long joinBufferMemory, FlinkJoinRelType type, boolean leftNeedsSort, GeneratedJoinConditionFunction condFuncCode, GeneratedProjection projectionCode1, GeneratedProjection projectionCode2, GeneratedSorter gSorter, GeneratedSorter keyGSorter, boolean[] filterNulls) {
        this(reservedSortMemory, maxSortMemory, perRequestMemory, probeBufferMemory, joinBufferMemory, type, leftNeedsSort, condFuncCode, projectionCode1, projectionCode2, gSorter, keyGSorter, filterNulls, 4096);
    }

    public OneSideSortMergeJoinOperator(long reservedSortMemory, long maxSortMemory, long perRequestMemory, long probeBufferMemory, long joinBufferMemory, FlinkJoinRelType type, boolean leftNeedsSort, GeneratedJoinConditionFunction condFuncCode, GeneratedProjection projectionCode1, GeneratedProjection projectionCode2, GeneratedSorter gSorter, GeneratedSorter keyGSorter, boolean[] filterNulls, int bufferThreshold) {
        if (type != FlinkJoinRelType.INNER && type != FlinkJoinRelType.LEFT && type != FlinkJoinRelType.RIGHT) {
            throw new RuntimeException("One side sort merge join operator only supports inner/left outer/right outer join currently.");
        }
        LOG.info("Initializing one side sort merge join operator...\nleftNeedsSort = " + leftNeedsSort + ", reservedSortMemory = " + reservedSortMemory + ", preferredSortMemory = " + maxSortMemory + ", perRequestMemory = " + perRequestMemory + ", probeBufferMemory = " + probeBufferMemory + ", joinBufferMemory = " + joinBufferMemory);
        this.reservedSortMemory = reservedSortMemory;
        this.maxSortMemory = maxSortMemory;
        this.perRequestMemory = perRequestMemory;
        this.probeBufferMemory = probeBufferMemory;
        this.joinBufferMemory = joinBufferMemory;
        this.type = type;
        this.leftNeedsSort = leftNeedsSort;
        this.filterNulls = filterNulls;
        this.condFuncCode = condFuncCode;
        this.projectionCode1 = projectionCode1;
        this.projectionCode2 = projectionCode2;
        this.gSorter = gSorter;
        this.keyGSorter = keyGSorter;
        this.bufferThreshold = bufferThreshold;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.isFinished1 = false;
        this.isFinished2 = false;
        this.classes = this.cookGeneratedClasses(this.getContainingTask().getUserCodeClassLoader());
        StreamRecordCollector<BaseRow> collector = new StreamRecordCollector<BaseRow>(this.output);
        this.inputSerializer1 = (AbstractRowSerializer)this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader());
        this.serializer1 = new BinaryRowSerializer(this.inputSerializer1.getTypes());
        this.inputSerializer2 = (AbstractRowSerializer)this.getOperatorConfig().getTypeSerializerIn2(this.getUserCodeClassloader());
        this.serializer2 = new BinaryRowSerializer(this.inputSerializer2.getTypes());
        this.memManager = this.getContainingTask().getEnvironment().getMemoryManager();
        this.ioManager = this.getContainingTask().getEnvironment().getIOManager();
        JoinConditionFunction condFunc = this.classes.condFuncClass.newInstance();
        this.leftNullRow = new GenericRow(this.serializer1.getNumFields());
        this.rightNullRow = new GenericRow(this.serializer2.getNumFields());
        JoinedRow joinedRow = new JoinedRow();
        this.helper = new SortMergeJoinHelper(collector, condFunc, this.leftNullRow, this.rightNullRow, joinedRow);
        this.initSorter();
        this.probeBuffer = this.newBuffer(this.probeBufferMemory, this.leftNeedsSort ? this.inputSerializer2 : this.inputSerializer1);
        this.initGauge();
    }

    protected CookedClasses cookGeneratedClasses(ClassLoader cl) throws CompileException {
        return new CookedClasses(CodeGenUtils.compile(cl, this.condFuncCode.name(), this.condFuncCode.code()), CodeGenUtils.compile(cl, this.keyGSorter.comparator().name(), this.keyGSorter.comparator().code()), CodeGenUtils.compile(cl, this.projectionCode1.name(), this.projectionCode1.code()), CodeGenUtils.compile(cl, this.projectionCode2.name(), this.projectionCode2.code()), CodeGenUtils.compile(cl, this.gSorter.computer().name(), this.gSorter.computer().code()), CodeGenUtils.compile(cl, this.gSorter.comparator().name(), this.gSorter.comparator().code()));
    }

    private void initGauge() {
        this.getMetricGroup().gauge("memoryUsedSizeInBytes", () -> this.sorter.getUsedMemoryInBytes() + this.probeBuffer.getUsedMemoryInBytes());
        this.getMetricGroup().gauge("numSpillFiles", () -> this.sorter.getNumSpillFiles() + (long)this.probeBuffer.getNumSpillFiles());
        this.getMetricGroup().gauge("spillInBytes", () -> this.sorter.getSpillInBytes() + this.probeBuffer.getSpillInBytes());
    }

    private void initSorter() throws Exception {
        BinaryRowSerializer serializer;
        AbstractRowSerializer inputSerializer;
        NormalizedKeyComputer computer = this.classes.computerClass.newInstance();
        computer.init(this.gSorter.serializers(), this.gSorter.comparators());
        RecordComparator comparator = this.classes.comparatorClass.newInstance();
        comparator.init(this.gSorter.serializers(), this.gSorter.comparators());
        if (this.leftNeedsSort) {
            inputSerializer = this.inputSerializer1;
            serializer = this.serializer1;
        } else {
            inputSerializer = this.inputSerializer2;
            serializer = this.serializer2;
        }
        this.sorter = new BinaryExternalSorter(this.getContainingTask(), this.memManager, this.reservedSortMemory, this.maxSortMemory, this.perRequestMemory, this.ioManager, inputSerializer, serializer, computer, comparator, this.getSqlConf());
        this.sorter.startThreads();
    }

    private void initJoinIterator() throws Exception {
        Projection projection1 = this.classes.projectionClass1.newInstance();
        Projection projection2 = this.classes.projectionClass2.newInstance();
        this.probeIter = new ProbeIterator();
        MutableObjectIterator<BinaryRow> sortIter = this.sorter.getIterator();
        RecordComparator keyComparator = this.classes.keyComparatorClass.newInstance();
        keyComparator.init(this.keyGSorter.serializers(), this.keyGSorter.comparators());
        if (this.type == FlinkJoinRelType.INNER) {
            this.joinIterator = this.leftNeedsSort ? new SortMergeInnerJoinIterator(this.serializer2, this.serializer1, projection2, projection1, keyComparator, this.probeIter, sortIter, this.newListBuffer(this.joinBufferMemory, this.serializer1), this.filterNulls) : new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, projection1, projection2, keyComparator, this.probeIter, sortIter, this.newListBuffer(this.joinBufferMemory, this.serializer2), this.filterNulls);
        } else if (this.type == FlinkJoinRelType.LEFT) {
            this.joinIterator = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, projection1, projection2, keyComparator, this.probeIter, sortIter, this.newListBuffer(this.joinBufferMemory, this.serializer2), this.filterNulls);
        } else if (this.type == FlinkJoinRelType.RIGHT) {
            this.joinIterator = new SortMergeOneSideOuterJoinIterator(this.serializer2, this.serializer1, projection2, projection1, keyComparator, this.probeIter, sortIter, this.newListBuffer(this.joinBufferMemory, this.serializer1), this.filterNulls);
        } else {
            throw new RuntimeException("Not support type: " + (Object)((Object)this.type));
        }
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> record) throws Exception {
        if (this.leftNeedsSort) {
            this.sorter.write((BaseRow)record.getValue());
        } else {
            BaseRow row2 = (BaseRow)record.getValue();
            if (this.isFinished2) {
                this.probeRow = row2;
                this.runJoin();
            } else {
                this.probeBuffer.add(row2);
            }
        }
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> record) throws Exception {
        if (this.leftNeedsSort) {
            BaseRow row2 = (BaseRow)record.getValue();
            if (this.isFinished1) {
                this.probeRow = row2;
                this.runJoin();
            } else {
                this.probeBuffer.add(row2);
            }
        } else {
            this.sorter.write((BaseRow)record.getValue());
        }
        return TwoInputSelection.ANY;
    }

    public void endInput1() throws Exception {
        this.isFinished1 = true;
        if (this.leftNeedsSort) {
            this.initJoinIterator();
            this.runJoin();
        }
    }

    public void endInput2() throws Exception {
        this.isFinished2 = true;
        if (!this.leftNeedsSort) {
            this.initJoinIterator();
            this.runJoin();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.probeIter != null) {
            this.probeIter.close();
        }
        if (this.probeBuffer != null) {
            this.probeBuffer.close();
        }
        if (this.sorter != null) {
            this.sorter.close();
        }
        if (this.joinIterator != null) {
            this.joinIterator.close();
        }
    }

    private void runJoin() throws Exception {
        if (this.type == FlinkJoinRelType.INNER) {
            this.helper.innerJoin((SortMergeInnerJoinIterator)this.joinIterator, this.leftNeedsSort);
        } else if (this.type == FlinkJoinRelType.LEFT) {
            if (this.leftNeedsSort) {
                throw new RuntimeException("Not support yet!");
            }
            this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator)this.joinIterator, false, this.rightNullRow);
        } else if (this.type == FlinkJoinRelType.RIGHT) {
            if (this.leftNeedsSort) {
                this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator)this.joinIterator, true, this.leftNullRow);
            } else {
                throw new RuntimeException("Not support yet!");
            }
        }
    }

    private ResettableListBuffer newListBuffer(long memorySize, AbstractRowSerializer serializer) throws MemoryAllocationException {
        List externalBufferSegments = this.memManager.allocatePages((Object)this.getContainingTask(), (int)(memorySize / (long)this.memManager.getPageSize()));
        return new ResettableListBuffer(this.memManager, this.ioManager, externalBufferSegments, serializer, this.bufferThreshold);
    }

    private ResettableExternalBuffer newBuffer(long memorySize, AbstractRowSerializer serializer) throws MemoryAllocationException {
        List externalBufferSegments = this.memManager.allocatePages((Object)this.getContainingTask(), (int)(memorySize / (long)this.memManager.getPageSize()));
        return new ResettableExternalBuffer(this.memManager, this.ioManager, externalBufferSegments, serializer);
    }

    private class ProbeIterator
    implements MutableObjectIterator<BaseRow> {
        private ResettableExternalBuffer.BufferIterator bufferIterator;

        private ProbeIterator() {
            this.bufferIterator = OneSideSortMergeJoinOperator.this.probeBuffer.newIterator();
        }

        @Override
        public BaseRow next(BaseRow reuse) throws IOException {
            return this.next();
        }

        @Override
        public BaseRow next() throws IOException {
            if (this.bufferIterator.hasNext()) {
                this.bufferIterator.advanceNext();
                return this.bufferIterator.getRow();
            }
            if (OneSideSortMergeJoinOperator.this.probeRow == null) {
                return null;
            }
            BaseRow row2 = OneSideSortMergeJoinOperator.this.probeRow;
            OneSideSortMergeJoinOperator.this.probeRow = null;
            return row2;
        }

        public void close() {
            this.bufferIterator.close();
        }
    }

    protected static class CookedClasses {
        protected final Class<JoinConditionFunction> condFuncClass;
        protected final Class<RecordComparator> keyComparatorClass;
        protected final Class<Projection> projectionClass1;
        protected final Class<Projection> projectionClass2;
        protected final Class<NormalizedKeyComputer> computerClass;
        protected final Class<RecordComparator> comparatorClass;

        protected CookedClasses(Class<JoinConditionFunction> condFuncClass, Class<RecordComparator> keyComparatorClass, Class<Projection> projectionClass1, Class<Projection> projectionClass2, Class<NormalizedKeyComputer> computerClass, Class<RecordComparator> comparatorClass) {
            this.condFuncClass = condFuncClass;
            this.keyComparatorClass = keyComparatorClass;
            this.projectionClass1 = projectionClass1;
            this.projectionClass2 = projectionClass2;
            this.computerClass = computerClass;
            this.comparatorClass = comparatorClass;
        }
    }
}

