package org.apache.flink.table.runtime.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.table.util.BinaryMergeIterator;
import org.apache.flink.table.util.ChannelWithMeta;
import org.apache.flink.table.util.MemorySegmentPool;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.class */
public class BufferedKVExternalSorter {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedKVExternalSorter.class);
    private static final int WRITE_MEMORY_NUM = 2;
    private final NormalizedKeyComputer nKeyComputer;
    private final RecordComparator comparator;
    private final BinaryRowSerializer keySerializer;
    private final BinaryRowSerializer valueSerializer;
    private final BinaryKVExternalMerger merger;
    private final IOManager ioManager;
    private final int maxNumFileHandles;
    private final FileIOChannel.Enumerator enumerator;
    private volatile boolean closed = false;
    private final List<ChannelWithMeta> channelIDs = new ArrayList();
    private final IndexedSorter sorter = new QuickSort();
    private final SpillChannelManager channelManager = new SpillChannelManager();

    public BufferedKVExternalSorter(IOManager iOManager, BinaryRowSerializer binaryRowSerializer, BinaryRowSerializer binaryRowSerializer2, NormalizedKeyComputer normalizedKeyComputer, RecordComparator recordComparator, int i, int i2) throws IOException {
        this.keySerializer = binaryRowSerializer;
        this.valueSerializer = binaryRowSerializer2;
        this.nKeyComputer = normalizedKeyComputer;
        this.comparator = recordComparator;
        this.maxNumFileHandles = i;
        this.ioManager = iOManager;
        this.enumerator = this.ioManager.createChannelEnumerator();
        this.merger = new BinaryKVExternalMerger(iOManager, i2, i, this.channelManager, binaryRowSerializer, binaryRowSerializer2, recordComparator);
    }

    public MutableObjectIterator<Tuple2<BinaryRow, BinaryRow>> getKVIterator(MemorySegmentPool memorySegmentPool) throws IOException {
        return getKVIterator(new ArrayList(), memorySegmentPool);
    }

    public MutableObjectIterator<Tuple2<BinaryRow, BinaryRow>> getKVIterator(List<MemorySegment> list, MemorySegmentPool memorySegmentPool) throws IOException {
        List<ChannelWithMeta> list2;
        int min = 2 + Math.min(this.channelIDs.size(), this.maxNumFileHandles);
        int size = list.size();
        if (min > size) {
            list = new ArrayList(list);
            for (int i = 0; i < min - size; i++) {
                list.add(memorySegmentPool.nextSegment());
            }
        }
        ArrayList arrayList = new ArrayList(list.subList(0, 2));
        ArrayList arrayList2 = new ArrayList(list.subList(2, list.size()));
        List<ChannelWithMeta> list3 = this.channelIDs;
        while (true) {
            list2 = list3;
            if (this.closed || list2.size() <= this.maxNumFileHandles) {
                break;
            }
            list3 = this.merger.mergeChannelList(list2, arrayList2, arrayList);
        }
        ArrayList arrayList3 = new ArrayList(list2.size());
        BinaryExternalMerger.getSegmentsForReaders(arrayList3, arrayList2, list2.size());
        ArrayList arrayList4 = new ArrayList();
        BinaryMergeIterator<Tuple2<BinaryRow, BinaryRow>> mergingIterator = this.merger.getMergingIterator(list2, arrayList3, arrayList4);
        this.channelManager.addOpenChannels(arrayList4);
        return mergingIterator;
    }

    public void sortAndSpill(ArrayList<MemorySegment> arrayList, long j, MemorySegmentPool memorySegmentPool) throws IOException {
        BinaryKVInMemorySortBuffer createBuffer = BinaryKVInMemorySortBuffer.createBuffer(this.nKeyComputer, this.keySerializer, this.valueSerializer, this.comparator, arrayList, j, memorySegmentPool);
        this.sorter.sort(createBuffer);
        FileIOChannel.ID next = this.enumerator.next();
        this.channelManager.addChannel(next);
        BlockChannelWriter blockChannelWriter = null;
        try {
            blockChannelWriter = this.ioManager.createBlockChannelWriter(next);
            HeaderlessChannelWriterOutputView headerlessChannelWriterOutputView = new HeaderlessChannelWriterOutputView(blockChannelWriter, Arrays.asList(memorySegmentPool.nextSegment(), memorySegmentPool.nextSegment()), memorySegmentPool.pageSize());
            createBuffer.writeToOutput(headerlessChannelWriterOutputView);
            LOG.info("here spill the kv external buffer data with {} bytes", Long.valueOf(blockChannelWriter.getSize()));
            this.channelIDs.add(new ChannelWithMeta(next, headerlessChannelWriterOutputView.getBlockCount(), headerlessChannelWriterOutputView.close()));
        } catch (IOException e) {
            if (blockChannelWriter != null) {
                blockChannelWriter.closeAndDelete();
            }
            throw e;
        }
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.merger.close();
        this.channelManager.close();
    }
}
