/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.io.blockcompression.BlockCompressionFactory;
import org.apache.flink.api.common.io.blockcompression.BlockCompressionFactoryLoader;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.IndexedSortable;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.sort.BinaryKVExternalMerger;
import org.apache.flink.table.runtime.sort.BinaryKVInMemorySortBuffer;
import org.apache.flink.table.runtime.sort.BinaryMergeIterator;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.sort.SpillChannelManager;
import org.apache.flink.table.runtime.util.AbstractChannelWriterOutputView;
import org.apache.flink.table.runtime.util.ChannelWithMeta;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedKVExternalSorter {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedKVExternalSorter.class);
    private volatile boolean closed = false;
    private final NormalizedKeyComputer nKeyComputer;
    private final RecordComparator comparator;
    private final BinaryRowSerializer keySerializer;
    private final BinaryRowSerializer valueSerializer;
    private final IndexedSorter sorter;
    private final BinaryKVExternalMerger merger;
    private final IOManager ioManager;
    private final int maxNumFileHandles;
    private final FileIOChannel.Enumerator enumerator;
    private final List<ChannelWithMeta> channelIDs = new ArrayList<ChannelWithMeta>();
    private final SpillChannelManager channelManager;
    private int pageSize;
    private long numSpillFiles;
    private long spillInBytes;
    private long spillInCompressedBytes;
    private final boolean compressionEnable;
    private final BlockCompressionFactory compressionCodecFactory;
    private final int compressionBlockSize;

    public BufferedKVExternalSorter(IOManager ioManager, BinaryRowSerializer keySerializer, BinaryRowSerializer valueSerializer, NormalizedKeyComputer nKeyComputer, RecordComparator comparator, int pageSize, Configuration conf) throws IOException {
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
        this.nKeyComputer = nKeyComputer;
        this.comparator = comparator;
        this.pageSize = pageSize;
        this.sorter = new QuickSort();
        this.maxNumFileHandles = conf.getInteger(TableConfigOptions.SQL_EXEC_SORT_FILE_HANDLES_MAX_NUM);
        this.compressionEnable = conf.getBoolean(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_ENABLED);
        this.compressionCodecFactory = this.compressionEnable ? BlockCompressionFactoryLoader.createBlockCompressionFactory(conf.getString(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_CODEC), conf) : null;
        this.compressionBlockSize = conf.getInteger(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_BLOCK_SIZE);
        this.ioManager = ioManager;
        this.enumerator = this.ioManager.createChannelEnumerator();
        this.channelManager = new SpillChannelManager();
        this.merger = new BinaryKVExternalMerger(ioManager, pageSize, this.maxNumFileHandles, this.channelManager, keySerializer, valueSerializer, comparator, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize);
    }

    public MutableObjectIterator<Tuple2<BinaryRow, BinaryRow>> getKVIterator() throws IOException {
        List<ChannelWithMeta> channelIDs = this.channelIDs;
        while (!this.closed && channelIDs.size() > this.maxNumFileHandles) {
            channelIDs = this.merger.mergeChannelList(channelIDs);
        }
        ArrayList<FileIOChannel> openChannels = new ArrayList<FileIOChannel>();
        BinaryMergeIterator<Tuple2<BinaryRow, BinaryRow>> iterator = this.merger.getMergingIterator(channelIDs, openChannels);
        this.channelManager.addOpenChannels(openChannels);
        return iterator;
    }

    public void sortAndSpill(ArrayList<MemorySegment> recordBufferSegments, long numElements, MemorySegmentPool pool) throws IOException {
        int blockCount;
        int bytesInLastBuffer;
        BinaryKVInMemorySortBuffer buffer = BinaryKVInMemorySortBuffer.createBuffer(this.nKeyComputer, this.keySerializer, this.valueSerializer, this.comparator, recordBufferSegments, numElements, pool);
        this.sorter.sort((IndexedSortable)buffer);
        FileIOChannel.ID channel = this.enumerator.next();
        this.channelManager.addChannel(channel);
        AbstractChannelWriterOutputView output2 = null;
        try {
            ++this.numSpillFiles;
            output2 = FileChannelUtil.createOutputView(this.ioManager, channel, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize, this.pageSize);
            buffer.writeToOutput(output2);
            this.spillInBytes += output2.getNumBytes();
            this.spillInCompressedBytes += output2.getNumCompressedBytes();
            bytesInLastBuffer = output2.close();
            blockCount = output2.getBlockCount();
            LOG.info("here spill the {}th kv external buffer data with {} bytes and {} compressed bytes", new Object[]{this.numSpillFiles, this.spillInBytes, this.spillInCompressedBytes});
        }
        catch (IOException e2) {
            if (output2 != null) {
                output2.closeAndDelete();
            }
            throw e2;
        }
        this.channelIDs.add(new ChannelWithMeta(channel, blockCount, bytesInLastBuffer));
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.merger.close();
        this.channelManager.close();
    }
}

