/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.join.batch;

import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.plan.FlinkJoinRelType;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.join.batch.SortMergeFullOuterJoinIterator;
import org.apache.flink.table.runtime.join.batch.SortMergeInnerJoinIterator;
import org.apache.flink.table.runtime.join.batch.SortMergeJoinHelper;
import org.apache.flink.table.runtime.join.batch.SortMergeJoinIterator;
import org.apache.flink.table.runtime.join.batch.SortMergeOneSideOuterJoinIterator;
import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.codehaus.commons.compiler.CompileException;

public class SortMergeJoinOperator
extends AbstractStreamOperatorWithMetrics<BaseRow>
implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
    private final long reservedSortMemory1;
    private final long maxSortMemory1;
    private final long reservedSortMemory2;
    private final long maxSortMemory2;
    private final long perRequestMemory;
    private final long externalBufferMemory;
    private final FlinkJoinRelType type;
    private final boolean leftIsSmaller;
    private GeneratedJoinConditionFunction condFuncCode;
    private GeneratedProjection projectionCode1;
    private GeneratedProjection projectionCode2;
    private GeneratedSorter gSorter1;
    private GeneratedSorter gSorter2;
    private GeneratedSorter keyGSorter;
    private final boolean[] filterNulls;
    private transient CookedClasses classes;
    private transient Configuration conf;
    private transient MemoryManager memManager;
    private transient IOManager ioManager;
    private transient TypeSerializer<BaseRow> inputSerializer1;
    private transient TypeSerializer<BaseRow> inputSerializer2;
    private transient BinaryRowSerializer serializer1;
    private transient BinaryRowSerializer serializer2;
    private transient BinaryExternalSorter sorter1;
    private transient BinaryExternalSorter sorter2;
    private transient SortMergeJoinIterator joinIterator1;
    private transient SortMergeJoinIterator joinIterator2;
    private transient SortMergeFullOuterJoinIterator fullOuterJoinIterator;
    private transient Collector<BaseRow> collector;
    private transient boolean[] isFinished;
    private transient JoinConditionFunction condFunc;
    private transient RecordComparator keyComparator;
    private transient BaseRow leftNullRow;
    private transient BaseRow rightNullRow;
    private transient SortMergeJoinHelper helper;

    public SortMergeJoinOperator(long reservedSortMemory, long maxSortMemory, long perRequestMemory, long externalBufferMemory, FlinkJoinRelType type, boolean leftIsSmaller, GeneratedJoinConditionFunction condFuncCode, GeneratedProjection projectionCode1, GeneratedProjection projectionCode2, GeneratedSorter gSorter1, GeneratedSorter gSorter2, GeneratedSorter keyGSorter, boolean[] filterNulls) {
        this(reservedSortMemory, maxSortMemory, reservedSortMemory, maxSortMemory, perRequestMemory, externalBufferMemory, type, leftIsSmaller, condFuncCode, projectionCode1, projectionCode2, gSorter1, gSorter2, keyGSorter, filterNulls);
    }

    public SortMergeJoinOperator(long reservedSortMemory1, long maxSortMemory1, long reservedSortMemory2, long maxSortMemory2, long perRequestMemory, long externalBufferMemory, FlinkJoinRelType type, boolean leftIsSmaller, GeneratedJoinConditionFunction condFuncCode, GeneratedProjection projectionCode1, GeneratedProjection projectionCode2, GeneratedSorter gSorter1, GeneratedSorter gSorter2, GeneratedSorter keyGSorter, boolean[] filterNulls) {
        this.reservedSortMemory1 = reservedSortMemory1;
        this.maxSortMemory1 = maxSortMemory1;
        this.reservedSortMemory2 = reservedSortMemory2;
        this.maxSortMemory2 = maxSortMemory2;
        this.perRequestMemory = perRequestMemory;
        this.externalBufferMemory = externalBufferMemory;
        this.type = type;
        this.leftIsSmaller = leftIsSmaller;
        this.condFuncCode = condFuncCode;
        this.projectionCode1 = projectionCode1;
        this.projectionCode2 = projectionCode2;
        this.gSorter1 = Preconditions.checkNotNull(gSorter1);
        this.gSorter2 = Preconditions.checkNotNull(gSorter2);
        this.keyGSorter = Preconditions.checkNotNull(keyGSorter);
        this.filterNulls = filterNulls;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.conf = this.getSqlConf();
        this.classes = this.cookGeneratedClasses(this.getContainingTask().getUserCodeClassLoader());
        this.isFinished = new boolean[2];
        this.isFinished[0] = false;
        this.isFinished[1] = false;
        this.collector = new StreamRecordCollector<BaseRow>(this.output);
        this.inputSerializer1 = this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader());
        this.serializer1 = new BinaryRowSerializer(((AbstractRowSerializer)this.inputSerializer1).getTypes());
        this.inputSerializer2 = this.getOperatorConfig().getTypeSerializerIn2(this.getUserCodeClassloader());
        this.serializer2 = new BinaryRowSerializer(((AbstractRowSerializer)this.inputSerializer2).getTypes());
        this.memManager = this.getContainingTask().getEnvironment().getMemoryManager();
        this.ioManager = this.getContainingTask().getEnvironment().getIOManager();
        this.initSorter();
        this.initKeyComparator();
        this.condFunc = this.classes.condFuncClass.newInstance();
        this.leftNullRow = new GenericRow(this.serializer1.getNumFields());
        this.rightNullRow = new GenericRow(this.serializer2.getNumFields());
        JoinedRow joinedRow = new JoinedRow();
        this.helper = new SortMergeJoinHelper(this.collector, this.condFunc, this.leftNullRow, this.rightNullRow, joinedRow);
        this.condFuncCode = null;
        this.keyGSorter = null;
        this.projectionCode1 = null;
        this.projectionCode2 = null;
        this.gSorter1 = null;
        this.gSorter2 = null;
        this.getMetricGroup().gauge("memoryUsedSizeInBytes", () -> this.sorter1.getUsedMemoryInBytes() + this.sorter2.getUsedMemoryInBytes());
        this.getMetricGroup().gauge("numSpillFiles", () -> this.sorter1.getNumSpillFiles() + this.sorter2.getNumSpillFiles());
        this.getMetricGroup().gauge("spillInBytes", () -> this.sorter1.getSpillInBytes() + this.sorter2.getSpillInBytes());
    }

    private void initSorter() throws Exception {
        NormalizedKeyComputer computer1 = this.classes.computerClass1.newInstance();
        RecordComparator comparator1 = this.classes.comparatorClass1.newInstance();
        computer1.init(this.gSorter1.serializers(), this.gSorter1.comparators());
        comparator1.init(this.gSorter1.serializers(), this.gSorter1.comparators());
        this.sorter1 = new BinaryExternalSorter(this.getContainingTask(), this.memManager, this.reservedSortMemory1, this.maxSortMemory1, this.perRequestMemory, this.ioManager, this.inputSerializer1, this.serializer1, computer1, comparator1, this.conf);
        this.sorter1.startThreads();
        NormalizedKeyComputer computer2 = this.classes.computerClass2.newInstance();
        RecordComparator comparator2 = this.classes.comparatorClass2.newInstance();
        computer2.init(this.gSorter2.serializers(), this.gSorter2.comparators());
        comparator2.init(this.gSorter2.serializers(), this.gSorter2.comparators());
        this.sorter2 = new BinaryExternalSorter(this.getContainingTask(), this.memManager, this.reservedSortMemory2, this.maxSortMemory2, this.perRequestMemory, this.ioManager, this.inputSerializer2, this.serializer2, computer2, comparator2, this.conf);
        this.sorter2.startThreads();
    }

    private void initKeyComparator() throws Exception {
        this.keyComparator = this.classes.keyComparatorClass.newInstance();
        this.keyComparator.init(this.keyGSorter.serializers(), this.keyGSorter.comparators());
    }

    protected CookedClasses cookGeneratedClasses(ClassLoader cl) throws CompileException {
        return new CookedClasses(CodeGenUtils.compile(cl, this.condFuncCode.name(), this.condFuncCode.code()), CodeGenUtils.compile(cl, this.keyGSorter.comparator().name(), this.keyGSorter.comparator().code()), CodeGenUtils.compile(cl, this.projectionCode1.name(), this.projectionCode1.code()), CodeGenUtils.compile(cl, this.projectionCode2.name(), this.projectionCode2.code()), CodeGenUtils.compile(cl, this.gSorter1.computer().name(), this.gSorter1.computer().code()), CodeGenUtils.compile(cl, this.gSorter2.computer().name(), this.gSorter2.computer().code()), CodeGenUtils.compile(cl, this.gSorter1.comparator().name(), this.gSorter1.comparator().code()), CodeGenUtils.compile(cl, this.gSorter2.comparator().name(), this.gSorter2.comparator().code()));
    }

    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement1(StreamRecord<BaseRow> element) throws Exception {
        this.sorter1.write((BaseRow)element.getValue());
        return TwoInputSelection.ANY;
    }

    public TwoInputSelection processElement2(StreamRecord<BaseRow> element) throws Exception {
        this.sorter2.write((BaseRow)element.getValue());
        return TwoInputSelection.ANY;
    }

    public void endInput1() throws Exception {
        this.isFinished[0] = true;
        if (this.isAllFinished()) {
            this.doSortMergeJoin();
        }
    }

    public void endInput2() throws Exception {
        this.isFinished[1] = true;
        if (this.isAllFinished()) {
            this.doSortMergeJoin();
        }
    }

    private void doSortMergeJoin() throws Exception {
        Projection projection1 = this.classes.projectionClass1.newInstance();
        Projection projection2 = this.classes.projectionClass2.newInstance();
        MutableObjectIterator<BaseRow> iterator1 = this.sorter1.getIterator();
        MutableObjectIterator<BaseRow> iterator2 = this.sorter2.getIterator();
        if (this.type.equals((Object)FlinkJoinRelType.INNER)) {
            if (!this.leftIsSmaller) {
                this.joinIterator2 = new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, projection1, projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);
                this.helper.innerJoin((SortMergeInnerJoinIterator)this.joinIterator2, false);
            } else {
                this.joinIterator1 = new SortMergeInnerJoinIterator(this.serializer2, this.serializer1, projection2, projection1, this.keyComparator, iterator2, iterator1, this.newBuffer(this.serializer1), this.filterNulls);
                this.helper.innerJoin((SortMergeInnerJoinIterator)this.joinIterator1, true);
            }
        } else if (this.type.equals((Object)FlinkJoinRelType.LEFT)) {
            this.joinIterator2 = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, projection1, projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);
            this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator)this.joinIterator2, false, this.rightNullRow);
        } else if (this.type.equals((Object)FlinkJoinRelType.RIGHT)) {
            this.joinIterator1 = new SortMergeOneSideOuterJoinIterator(this.serializer2, this.serializer1, projection2, projection1, this.keyComparator, iterator2, iterator1, this.newBuffer(this.serializer1), this.filterNulls);
            this.helper.oneSideOuterJoin((SortMergeOneSideOuterJoinIterator)this.joinIterator1, true, this.leftNullRow);
        } else if (this.type.equals((Object)FlinkJoinRelType.FULL)) {
            this.fullOuterJoinIterator = new SortMergeFullOuterJoinIterator(this.serializer1, this.serializer2, projection1, projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer1), this.newBuffer(this.serializer2), this.filterNulls);
            this.helper.fullOuterJoin(this.fullOuterJoinIterator);
        } else if (this.type.equals((Object)FlinkJoinRelType.SEMI)) {
            this.joinIterator2 = new SortMergeInnerJoinIterator(this.serializer1, this.serializer2, projection1, projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);
            while (((SortMergeInnerJoinIterator)this.joinIterator2).nextInnerJoin()) {
                BaseRow probeRow = this.joinIterator2.getProbeRow();
                boolean matched = false;
                try (ResettableExternalBuffer.BufferIterator iter = this.joinIterator2.getMatchBuffer().newIterator();){
                    while (iter.advanceNext()) {
                        BinaryRow row2 = iter.getRow();
                        if (!this.condFunc.apply(probeRow, row2)) continue;
                        matched = true;
                        break;
                    }
                }
                if (!matched) continue;
                this.collector.collect(probeRow);
            }
        } else if (this.type.equals((Object)FlinkJoinRelType.ANTI)) {
            this.joinIterator2 = new SortMergeOneSideOuterJoinIterator(this.serializer1, this.serializer2, projection1, projection2, this.keyComparator, iterator1, iterator2, this.newBuffer(this.serializer2), this.filterNulls);
            while (((SortMergeOneSideOuterJoinIterator)this.joinIterator2).nextOuterJoin()) {
                BaseRow probeRow = this.joinIterator2.getProbeRow();
                ResettableExternalBuffer matchBuffer = this.joinIterator2.getMatchBuffer();
                boolean matched = false;
                if (matchBuffer != null) {
                    try (ResettableExternalBuffer.BufferIterator iter = matchBuffer.newIterator();){
                        while (iter.advanceNext()) {
                            BinaryRow row3 = iter.getRow();
                            if (!this.condFunc.apply(probeRow, row3)) continue;
                            matched = true;
                            break;
                        }
                    }
                }
                if (matched) continue;
                this.collector.collect(probeRow);
            }
        } else {
            throw new RuntimeException("Not support yet!");
        }
    }

    private ResettableExternalBuffer newBuffer(BinaryRowSerializer serializer) throws MemoryAllocationException {
        List externalBufferSegments = this.memManager.allocatePages((Object)this.getContainingTask(), (int)(this.externalBufferMemory / (long)this.memManager.getPageSize()));
        return new ResettableExternalBuffer(this.memManager, this.ioManager, externalBufferSegments, serializer);
    }

    private boolean isAllFinished() {
        return this.isFinished[0] && this.isFinished[1];
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.sorter1 != null) {
            this.sorter1.close();
        }
        if (this.sorter2 != null) {
            this.sorter2.close();
        }
        if (this.joinIterator1 != null) {
            this.joinIterator1.close();
        }
        if (this.joinIterator2 != null) {
            this.joinIterator2.close();
        }
        if (this.fullOuterJoinIterator != null) {
            this.fullOuterJoinIterator.close();
        }
    }

    protected static class CookedClasses {
        protected final Class<JoinConditionFunction> condFuncClass;
        protected final Class<RecordComparator> keyComparatorClass;
        protected final Class<Projection> projectionClass1;
        protected final Class<Projection> projectionClass2;
        protected final Class<NormalizedKeyComputer> computerClass1;
        protected final Class<NormalizedKeyComputer> computerClass2;
        protected final Class<RecordComparator> comparatorClass1;
        protected final Class<RecordComparator> comparatorClass2;

        protected CookedClasses(Class<JoinConditionFunction> condFuncClass, Class<RecordComparator> keyComparatorClass, Class<Projection> projectionClass1, Class<Projection> projectionClass2, Class<NormalizedKeyComputer> computerClass1, Class<NormalizedKeyComputer> computerClass2, Class<RecordComparator> comparatorClass1, Class<RecordComparator> comparatorClass2) {
            this.condFuncClass = condFuncClass;
            this.keyComparatorClass = keyComparatorClass;
            this.projectionClass1 = projectionClass1;
            this.projectionClass2 = projectionClass2;
            this.computerClass1 = computerClass1;
            this.computerClass2 = computerClass2;
            this.comparatorClass1 = comparatorClass1;
            this.comparatorClass2 = comparatorClass2;
        }
    }
}

