/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.sort;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.codehaus.commons.compiler.CompileException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SortOperator
extends AbstractStreamOperatorWithMetrics<BinaryRow>
implements OneInputStreamOperator<BaseRow, BinaryRow> {
    private final long reservedMemorySize;
    private final long maxMemorySize;
    private final long perRequestMemorySize;
    private GeneratedSorter gSorter;
    private static final Logger LOG = LoggerFactory.getLogger(SortOperator.class);
    protected transient Class<NormalizedKeyComputer> computerClass;
    protected transient Class<RecordComparator> comparatorClass;
    private transient BinaryExternalSorter sorter;
    private transient StreamRecordCollector<BinaryRow> collector;
    private transient BinaryRowSerializer binarySerializer;

    public SortOperator(long reservedMemorySize, long maxMemorySize, long perRequestMemorySize, GeneratedSorter gSorter) {
        this.reservedMemorySize = reservedMemorySize;
        this.maxMemorySize = maxMemorySize;
        this.perRequestMemorySize = perRequestMemorySize;
        this.gSorter = gSorter;
    }

    @Override
    public void open() throws Exception {
        super.open();
        LOG.info("Opening SortOperator");
        this.cookGeneratedClasses(this.getContainingTask().getUserCodeClassLoader());
        TypeSerializer inputSerializer = this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader());
        this.binarySerializer = new BinaryRowSerializer(((AbstractRowSerializer)inputSerializer).getTypes());
        MemoryManager memManager = this.getContainingTask().getEnvironment().getMemoryManager();
        IOManager ioManager = this.getContainingTask().getEnvironment().getIOManager();
        NormalizedKeyComputer computer = this.computerClass.newInstance();
        RecordComparator comparator = this.comparatorClass.newInstance();
        computer.init(this.gSorter.serializers(), this.gSorter.comparators());
        comparator.init(this.gSorter.serializers(), this.gSorter.comparators());
        this.sorter = new BinaryExternalSorter(this.getContainingTask(), memManager, this.reservedMemorySize, this.maxMemorySize, this.perRequestMemorySize, ioManager, inputSerializer, this.binarySerializer, computer, comparator, this.getSqlConf());
        this.sorter.startThreads();
        this.gSorter = null;
        this.collector = new StreamRecordCollector(this.output);
        this.getMetricGroup().gauge("memoryUsedSizeInBytes", this.sorter::getUsedMemoryInBytes);
        this.getMetricGroup().gauge("numSpillFiles", this.sorter::getNumSpillFiles);
        this.getMetricGroup().gauge("spillInBytes", this.sorter::getSpillInBytes);
    }

    protected void cookGeneratedClasses(ClassLoader cl) throws CompileException {
        this.computerClass = CodeGenUtils.compile(cl, this.gSorter.computer().name(), this.gSorter.computer().code());
        this.comparatorClass = CodeGenUtils.compile(cl, this.gSorter.comparator().name(), this.gSorter.comparator().code());
    }

    public void processElement(StreamRecord<BaseRow> element) throws Exception {
        this.sorter.write((BaseRow)element.getValue());
    }

    public void endInput() throws Exception {
        BinaryRow row2 = this.binarySerializer.createInstance();
        MutableObjectIterator<BinaryRow> iterator = this.sorter.getIterator();
        while ((row2 = iterator.next(row2)) != null) {
            this.collector.collect(row2);
        }
    }

    @Override
    public void close() throws Exception {
        LOG.info("Closing SortOperator");
        super.close();
        if (this.sorter != null) {
            this.sorter.close();
        }
    }
}

