package org.apache.flink.table.runtime.operator.sort;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.operator.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.operator.StreamRecordCollector;
import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.codehaus.commons.compiler.CompileException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operator/sort/SortOperator.class */
public class SortOperator extends AbstractStreamOperatorWithMetrics<BinaryRow> implements OneInputStreamOperator<BaseRow, BinaryRow> {
    private final long reservedMemorySize;
    private final long maxMemorySize;
    private final long perRequestMemorySize;
    private GeneratedSorter gSorter;
    private static final Logger LOG = LoggerFactory.getLogger(SortOperator.class);
    protected transient Class<NormalizedKeyComputer> computerClass;
    protected transient Class<RecordComparator> comparatorClass;
    private transient BinaryExternalSorter sorter;
    private transient StreamRecordCollector<BinaryRow> collector;
    private transient BinaryRowSerializer binarySerializer;

    public SortOperator(long j, long j2, long j3, GeneratedSorter generatedSorter) {
        this.reservedMemorySize = j;
        this.maxMemorySize = j2;
        this.perRequestMemorySize = j3;
        this.gSorter = generatedSorter;
    }

    @Override // org.apache.flink.table.runtime.operator.AbstractStreamOperatorWithMetrics
    public void open() throws Exception {
        super.open();
        LOG.info("Opening SortOperator");
        cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
        TypeSerializer typeSerializerIn1 = getOperatorContext().getTypeSerializerIn1();
        this.binarySerializer = new BinaryRowSerializer(((AbstractRowSerializer) typeSerializerIn1).getTypes());
        MemoryManager memoryManager = getContainingTask().getEnvironment().getMemoryManager();
        IOManager iOManager = getContainingTask().getEnvironment().getIOManager();
        NormalizedKeyComputer newInstance = this.computerClass.newInstance();
        RecordComparator newInstance2 = this.comparatorClass.newInstance();
        newInstance.init(this.gSorter.serializers(), this.gSorter.comparators());
        newInstance2.init(this.gSorter.serializers(), this.gSorter.comparators());
        this.sorter = new BinaryExternalSorter(getContainingTask(), memoryManager, this.reservedMemorySize, this.maxMemorySize, this.perRequestMemorySize, iOManager, typeSerializerIn1, this.binarySerializer, newInstance, newInstance2);
        this.sorter.startThreads();
        this.gSorter = null;
        this.collector = new StreamRecordCollector<>(this.output);
        getMetricGroup().gauge("memoryUsedSizeInBytes", new Gauge<Long>() { // from class: org.apache.flink.table.runtime.operator.sort.SortOperator.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m5191getValue() {
                return Long.valueOf(SortOperator.this.sorter.getUsedMemoryInBytes());
            }
        });
        getMetricGroup().gauge("numSpillFiles", new Gauge<Long>() { // from class: org.apache.flink.table.runtime.operator.sort.SortOperator.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m5192getValue() {
                return Long.valueOf(SortOperator.this.sorter.getNumSpillFiles());
            }
        });
        getMetricGroup().gauge("spillInBytes", new Gauge<Long>() { // from class: org.apache.flink.table.runtime.operator.sort.SortOperator.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m5193getValue() {
                return Long.valueOf(SortOperator.this.sorter.getSpillInBytes());
            }
        });
    }

    protected void cookGeneratedClasses(ClassLoader classLoader) throws CompileException {
        this.computerClass = CodeGenUtils.compile(classLoader, this.gSorter.computer().name(), this.gSorter.computer().code());
        this.comparatorClass = CodeGenUtils.compile(classLoader, this.gSorter.comparator().name(), this.gSorter.comparator().code());
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.sorter.write((BaseRow) streamRecord.getValue());
    }

    public void endInput() throws Exception {
        sendStageDoneEvent(0);
        BinaryRow m5305createInstance = this.binarySerializer.m5305createInstance();
        MutableObjectIterator<BinaryRow> iterator = this.sorter.getIterator();
        while (true) {
            BinaryRow binaryRow = (BinaryRow) iterator.next(m5305createInstance);
            m5305createInstance = binaryRow;
            if (binaryRow == null) {
                return;
            } else {
                this.collector.collect(m5305createInstance);
            }
        }
    }

    @Override // org.apache.flink.table.runtime.operator.AbstractStreamOperatorWithMetrics
    public void close() throws Exception {
        LOG.info("Closing SortOperator");
        super.close();
        if (this.sorter != null) {
            this.sorter.close();
        }
    }
}
