/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.blockcompression.BlockCompressionFactory;
import org.apache.flink.api.common.io.blockcompression.BlockCompressionFactoryLoader;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.IndexedSortable;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.sort.BinaryExternalMerger;
import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.runtime.sort.BinaryMergeIterator;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.sort.SpillChannelManager;
import org.apache.flink.table.runtime.util.AbstractChannelWriterOutputView;
import org.apache.flink.table.runtime.util.ChannelWithMeta;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinaryExternalSorter
implements Sorter<BinaryRow> {
    protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
    public static final long SORTER_MIN_NUM_SORT_MEM = 327680L;
    private static final Logger LOG = LoggerFactory.getLogger(BinaryExternalSorter.class);
    private static final CircularElement EOF_MARKER = new CircularElement();
    private static final CircularElement SPILLING_MARKER = new CircularElement();
    private static final ChannelWithMeta FINAL_MERGE_MARKER = new ChannelWithMeta(null, -1, -1);
    protected final List<List<MemorySegment>> sortReadMemory;
    protected final List<BinaryInMemorySortBuffer> sortBuffers;
    protected final int fixedReadMemoryNum;
    protected final MemoryManager memoryManager;
    protected final Object iteratorLock = new Object();
    private ThreadBase sortThread;
    private ThreadBase spillThread;
    private ThreadBase mergeThread;
    protected volatile MutableObjectIterator<BinaryRow> iterator;
    protected volatile IOException iteratorException;
    protected volatile boolean closed;
    private ExceptionHandler<IOException> exceptionHandler;
    private CircularQueues circularQueues;
    private long bytesUntilSpilling;
    private CircularElement currWriteBuffer;
    private boolean writingDone = false;
    private final Object writeLock = new Object();
    private final SpillChannelManager channelManager;
    private final BinaryExternalMerger merger;
    private final int memorySegmentSize;
    private final boolean compressionEnable;
    private final BlockCompressionFactory compressionCodecFactory;
    private final int compressionBlockSize;
    private final boolean asyncMergeEnable;
    private final BinaryRowSerializer serializer;
    private long numSpillFiles;
    private long spillInBytes;
    private long spillInCompressedBytes;

    public BinaryExternalSorter(Object owner, MemoryManager memoryManager, long reservedMemorySize, long maxMemorySize, long perRequestMemorySize, IOManager ioManager, TypeSerializer<BaseRow> inputSerializer, BinaryRowSerializer serializer, NormalizedKeyComputer normalizedKeyComputer, RecordComparator comparator, Configuration conf) throws IOException {
        this(owner, memoryManager, reservedMemorySize, maxMemorySize, perRequestMemorySize, ioManager, inputSerializer, serializer, normalizedKeyComputer, comparator, conf, 0.8f);
    }

    public BinaryExternalSorter(Object owner, MemoryManager memoryManager, long reservedMemorySize, long maxMemorySize, long perRequestMemorySize, IOManager ioManager, TypeSerializer<BaseRow> inputSerializer, BinaryRowSerializer serializer, NormalizedKeyComputer normalizedKeyComputer, RecordComparator comparator, Configuration conf, float startSpillingFraction) throws IOException {
        ExceptionHandler exceptionHandler;
        List readMemory;
        int maxNumFileHandles = conf.getInteger(TableConfigOptions.SQL_EXEC_SORT_FILE_HANDLES_MAX_NUM);
        this.compressionEnable = conf.getBoolean(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_ENABLED);
        this.compressionCodecFactory = this.compressionEnable ? BlockCompressionFactoryLoader.createBlockCompressionFactory(conf.getString(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_CODEC), conf) : null;
        this.compressionBlockSize = conf.getInteger(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_BLOCK_SIZE);
        this.asyncMergeEnable = conf.getBoolean(TableConfigOptions.SQL_EXEC_SORT_ASYNC_MERGE_ENABLED);
        Preconditions.checkArgument(maxNumFileHandles >= 2);
        Preconditions.checkNotNull(ioManager);
        Preconditions.checkNotNull(normalizedKeyComputer);
        Preconditions.checkNotNull(comparator);
        this.serializer = (BinaryRowSerializer)serializer.duplicate();
        this.memoryManager = Preconditions.checkNotNull(memoryManager);
        this.memorySegmentSize = memoryManager.getPageSize();
        if (reservedMemorySize < 327680L) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least 327680 pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
        }
        int sortMemPages = (int)(reservedMemorySize / (long)memoryManager.getPageSize());
        long sortMemory = (long)sortMemPages * (long)memoryManager.getPageSize();
        int numSortBuffers = 1;
        long sortMaxMemSize = Math.max(maxMemorySize, reservedMemorySize);
        if (sortMaxMemSize > 0x6400000L) {
            numSortBuffers = 2;
        }
        int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
        this.sortReadMemory = new ArrayList<List<MemorySegment>>();
        try {
            readMemory = memoryManager.allocatePages(owner, sortMemPages);
        }
        catch (MemoryAllocationException e2) {
            LOG.error("Can't allocate {} pages from fixed memory pool.", (Object)sortMemPages, (Object)e2);
            throw new RuntimeException(e2);
        }
        this.fixedReadMemoryNum = readMemory.size();
        CircularQueues circularQueues = new CircularQueues();
        Iterator segments = readMemory.iterator();
        int perRequestBuffersNum = (int)(perRequestMemorySize / (long)memoryManager.getPageSize());
        int additionalLimitNumPages = (int)((maxMemorySize - reservedMemorySize) / (long)memoryManager.getPageSize());
        int eachBufferAdditionalLimitNumPages = additionalLimitNumPages / numSortBuffers;
        LOG.info("BinaryExternalSorter with initial memory segments {},And the preferred memory {} segments, per request {} segments from floating memory pool, maxNumFileHandles({}), compressionEnable({}), compressionCodecFactory({}), compressionBlockSize({}).", new Object[]{sortMemPages, (int)(maxMemorySize / (long)memoryManager.getPageSize()), perRequestBuffersNum, maxNumFileHandles, this.compressionEnable, this.compressionEnable ? this.compressionCodecFactory.getClass() : null, this.compressionBlockSize});
        this.sortBuffers = new ArrayList<BinaryInMemorySortBuffer>();
        for (int i = 0; i < numSortBuffers; ++i) {
            int k;
            ArrayList<MemorySegment> sortSegments = new ArrayList<MemorySegment>(numSegmentsPerSortBuffer);
            int n = k = i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer;
            while (k > 0 && segments.hasNext()) {
                sortSegments.add((MemorySegment)segments.next());
                --k;
            }
            this.sortReadMemory.add(sortSegments);
            BinaryInMemorySortBuffer buffer = BinaryInMemorySortBuffer.createBuffer(memoryManager, normalizedKeyComputer, inputSerializer, serializer, comparator, sortSegments, eachBufferAdditionalLimitNumPages, perRequestBuffersNum);
            CircularElement element = new CircularElement(i, buffer, sortSegments);
            circularQueues.empty.add(element);
            this.sortBuffers.add(buffer);
        }
        this.exceptionHandler = exceptionHandler = exception -> {
            if (!this.closed) {
                this.setResultIteratorException((IOException)exception);
                this.close();
            }
        };
        this.circularQueues = circularQueues;
        this.bytesUntilSpilling = (long)(startSpillingFraction * (float)sortMemory);
        if (this.bytesUntilSpilling < 1L) {
            this.bytesUntilSpilling = 0L;
            this.circularQueues.sort.add(SPILLING_MARKER);
        }
        this.channelManager = new SpillChannelManager();
        this.merger = new BinaryExternalMerger(ioManager, memoryManager.getPageSize(), maxNumFileHandles, this.channelManager, (BinaryRowSerializer)serializer.duplicate(), comparator, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize);
        this.sortThread = this.getSortingThread((ExceptionHandler<IOException>)exceptionHandler, circularQueues);
        this.spillThread = this.getSpillingThread((ExceptionHandler<IOException>)exceptionHandler, circularQueues, ioManager, (BinaryRowSerializer)serializer.duplicate(), comparator);
        this.mergeThread = this.getMergingThread((ExceptionHandler<IOException>)exceptionHandler, circularQueues, ioManager, maxNumFileHandles, this.merger);
        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
        if (contextLoader != null) {
            if (this.sortThread != null) {
                this.sortThread.setContextClassLoader(contextLoader);
            }
            if (this.spillThread != null) {
                this.spillThread.setContextClassLoader(contextLoader);
            }
            if (this.mergeThread != null) {
                this.mergeThread.setContextClassLoader(contextLoader);
            }
        }
    }

    public void startThreads() {
        if (this.sortThread != null) {
            this.sortThread.start();
        }
        if (this.spillThread != null) {
            this.spillThread.start();
        }
        if (this.mergeThread != null) {
            this.mergeThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Object object = this;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        try {
            object = this.iteratorLock;
            synchronized (object) {
                if (this.iteratorException == null) {
                    this.iteratorException = new IOException("The sorter has been closed.");
                    this.iteratorLock.notifyAll();
                }
            }
            if (this.sortThread != null) {
                try {
                    this.sortThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down sorter thread: " + t.getMessage(), t);
                }
            }
            if (this.spillThread != null) {
                try {
                    this.spillThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down spilling thread: " + t.getMessage(), t);
                }
            }
            if (this.mergeThread != null) {
                try {
                    this.mergeThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down merging thread: " + t.getMessage(), t);
                }
            }
            try {
                if (this.sortThread != null) {
                    this.sortThread.join();
                    this.sortThread = null;
                }
                if (this.spillThread != null) {
                    this.spillThread.join();
                    this.spillThread = null;
                }
                if (this.mergeThread != null) {
                    this.mergeThread.join();
                    this.mergeThread = null;
                }
            }
            catch (InterruptedException iex) {
                LOG.debug("Closing of sort/merger was interrupted. The reading/sorting/spilling/merging threads may still be working.", (Throwable)iex);
            }
        }
        finally {
            this.releaseSortMemory();
            this.circularQueues = null;
            this.currWriteBuffer = null;
            this.iterator = null;
            this.merger.close();
            this.channelManager.close();
        }
    }

    private void releaseSortMemory() {
        try {
            this.sortBuffers.forEach(BinaryInMemorySortBuffer::dispose);
            this.sortBuffers.clear();
        }
        catch (Throwable ignored) {
            LOG.info("error.", ignored);
        }
        this.releaseCoreSegments();
        this.sortReadMemory.clear();
    }

    private void releaseCoreSegments() {
        ArrayList<MemorySegment> coreSegments = new ArrayList<MemorySegment>();
        for (List<MemorySegment> segs : this.sortReadMemory) {
            coreSegments.addAll(segs);
        }
        try {
            if (!coreSegments.isEmpty()) {
                this.memoryManager.release(coreSegments);
            }
            coreSegments.clear();
        }
        catch (Throwable ignored) {
            LOG.info("error.", ignored);
        }
    }

    protected ThreadBase getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues) {
        return new SortingThread(exceptionHandler, queues);
    }

    protected SpillingThread getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues, IOManager ioManager, BinaryRowSerializer serializer, RecordComparator comparator) {
        return new SpillingThread(exceptionHandler, queues, ioManager, serializer, comparator);
    }

    protected MergingThread getMergingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues, IOManager ioManager, int maxNumFileHandles, BinaryExternalMerger merger) {
        return new MergingThread(exceptionHandler, queues, ioManager, maxNumFileHandles, merger);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void write(BaseRow current) throws IOException {
        Preconditions.checkArgument(!this.writingDone, "Adding already done!");
        try {
            while (true) {
                if (this.closed) {
                    throw new IOException("Already closed!", this.iteratorException);
                }
                Object object = this.writeLock;
                synchronized (object) {
                    if (this.currWriteBuffer == null) {
                        try {
                            this.currWriteBuffer = this.circularQueues.empty.poll(1L, TimeUnit.SECONDS);
                            if (this.currWriteBuffer == null) {
                                continue;
                            }
                            if (!this.currWriteBuffer.buffer.isEmpty()) {
                                throw new IOException("New buffer is not empty.");
                            }
                        }
                        catch (InterruptedException iex) {
                            throw new IOException(iex);
                        }
                    }
                    BinaryInMemorySortBuffer buffer = this.currWriteBuffer.buffer;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Retrieved empty read buffer " + this.currWriteBuffer.id + ".");
                    }
                    long occupancy = buffer.getOccupancy();
                    if (buffer.write(current)) {
                        if (this.bytesUntilSpilling <= 0L) return;
                        this.bytesUntilSpilling -= buffer.getOccupancy() - occupancy;
                        if (this.bytesUntilSpilling > 0L) return;
                        this.bytesUntilSpilling = 0L;
                        this.circularQueues.sort.add(SPILLING_MARKER);
                        return;
                    }
                    if (buffer.isEmpty()) {
                        throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + buffer.getCapacity() + " bytes).");
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Emitting full buffer: " + this.currWriteBuffer.id + ".");
                    }
                    this.circularQueues.sort.add(this.currWriteBuffer);
                    if (this.bytesUntilSpilling > 0L && this.circularQueues.empty.size() == 0) {
                        this.bytesUntilSpilling = 0L;
                        this.circularQueues.sort.add(SPILLING_MARKER);
                    }
                    this.currWriteBuffer = null;
                }
            }
        }
        catch (Throwable e2) {
            IOException ioe = new IOException(e2);
            if (this.exceptionHandler == null) throw ioe;
            this.exceptionHandler.handleException((Throwable)ioe);
            throw ioe;
        }
    }

    @VisibleForTesting
    public void write(MutableObjectIterator<BinaryRow> iterator) throws IOException {
        BinaryRow row2 = this.serializer.createInstance();
        while ((row2 = iterator.next(row2)) != null) {
            this.write(row2);
        }
    }

    public List<SortedDataFile<BinaryRow>> getRemainingSortedDataFiles() throws InterruptedException {
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MutableObjectIterator<BinaryRow> getIterator() throws InterruptedException {
        if (!this.writingDone) {
            this.writingDone = true;
            if (this.currWriteBuffer != null) {
                this.circularQueues.sort.add(this.currWriteBuffer);
            }
            this.circularQueues.sort.add(EOF_MARKER);
            LOG.debug("Sending done.");
        }
        Object object = this.iteratorLock;
        synchronized (object) {
            while (this.iterator == null && this.iteratorException == null) {
                this.iteratorLock.wait();
            }
            if (this.iteratorException != null) {
                throw new RuntimeException("Error obtaining the sorted input: " + this.iteratorException.getMessage(), this.iteratorException);
            }
            return this.iterator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResultIterator(MutableObjectIterator<BinaryRow> iterator) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.iteratorException == null) {
                this.iterator = iterator;
                this.iteratorLock.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResultIteratorException(IOException ioex) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.iteratorException == null) {
                this.iteratorException = ioex;
                this.iteratorLock.notifyAll();
            }
        }
    }

    public long getUsedMemoryInBytes() {
        long usedSizeInBytes = 0L;
        for (BinaryInMemorySortBuffer sortBuffer : this.sortBuffers) {
            usedSizeInBytes += sortBuffer.getOccupancy();
        }
        return usedSizeInBytes;
    }

    public long getNumSpillFiles() {
        return this.numSpillFiles;
    }

    public long getSpillInBytes() {
        return this.spillInBytes;
    }

    protected class MergingThread
    extends ThreadBase {
        protected final IOManager ioManager;
        protected final int maxFanIn;
        private final BinaryExternalMerger merger;

        public MergingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues, IOManager ioManager, int maxNumFileHandles, BinaryExternalMerger merger) {
            super(exceptionHandler, "SortMerger merging thread", queues);
            this.ioManager = ioManager;
            this.maxFanIn = maxNumFileHandles;
            this.merger = merger;
        }

        @Override
        protected void go() throws IOException {
            ArrayList<ChannelWithMeta> spillChannelIDs = new ArrayList<ChannelWithMeta>();
            List<ChannelWithMeta> finalMergeChannelIDs = new ArrayList<ChannelWithMeta>();
            while (this.isRunning()) {
                ChannelWithMeta channelID;
                try {
                    channelID = this.queues.merge.take();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Merging thread was interrupted (without being shut down) while grabbing a channel with meta. Retrying...");
                        continue;
                    }
                    return;
                }
                if (!this.isRunning()) {
                    return;
                }
                if (channelID == FINAL_MERGE_MARKER) {
                    finalMergeChannelIDs.addAll(spillChannelIDs);
                    spillChannelIDs.clear();
                    finalMergeChannelIDs.sort(Comparator.comparingInt(ChannelWithMeta::getBlockCount));
                    break;
                }
                spillChannelIDs.add(channelID);
                if (!BinaryExternalSorter.this.asyncMergeEnable || spillChannelIDs.size() < this.maxFanIn) continue;
                finalMergeChannelIDs.addAll(this.merger.mergeChannelList(spillChannelIDs));
                spillChannelIDs.clear();
            }
            if (finalMergeChannelIDs.isEmpty()) {
                if (BinaryExternalSorter.this.iterator == null) {
                    BinaryExternalSorter.this.setResultIterator(EmptyMutableObjectIterator.get());
                }
            } else {
                while (this.isRunning() && finalMergeChannelIDs.size() > this.maxFanIn) {
                    finalMergeChannelIDs = this.merger.mergeChannelList(finalMergeChannelIDs);
                }
                ArrayList<FileIOChannel> openChannels = new ArrayList<FileIOChannel>();
                BinaryMergeIterator<BinaryRow> iterator = this.merger.getMergingIterator(finalMergeChannelIDs, openChannels);
                BinaryExternalSorter.this.channelManager.addOpenChannels(openChannels);
                BinaryExternalSorter.this.setResultIterator(iterator);
            }
        }
    }

    protected class SpillingThread
    extends ThreadBase {
        protected final IOManager ioManager;
        protected final BinaryRowSerializer serializer;
        protected final RecordComparator comparator;

        public SpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues, IOManager ioManager, BinaryRowSerializer serializer, RecordComparator comparator) {
            super(exceptionHandler, "SortMerger spilling thread", queues);
            this.ioManager = ioManager;
            this.serializer = serializer;
            this.comparator = comparator;
        }

        @Override
        public void go() throws IOException {
            CircularElement element;
            ArrayDeque<CircularElement> cache = new ArrayDeque<CircularElement>();
            boolean cacheOnly = false;
            while (this.isRunning()) {
                try {
                    element = this.queues.spill.take();
                }
                catch (InterruptedException iex) {
                    throw new IOException("The spilling thread was interrupted.");
                }
                if (element == SPILLING_MARKER) break;
                if (element == EOF_MARKER) {
                    cacheOnly = true;
                    break;
                }
                cache.add(element);
            }
            if (!this.isRunning()) {
                return;
            }
            if (cacheOnly) {
                ArrayList iterators = new ArrayList(cache.size());
                for (CircularElement cached : cache) {
                    iterators.add(cached.buffer.getIterator());
                }
                ArrayList<BinaryRow> reusableEntries = new ArrayList<BinaryRow>();
                for (int i = 0; i < iterators.size(); ++i) {
                    reusableEntries.add(this.serializer.createInstance());
                }
                BinaryExternalSorter.this.setResultIterator((MutableObjectIterator<BinaryRow>)(iterators.isEmpty() ? EmptyMutableObjectIterator.get() : (iterators.size() == 1 ? (MutableObjectIterator)iterators.get(0) : new BinaryMergeIterator(iterators, reusableEntries, this.comparator::compare))));
                this.releaseEmptyBuffers();
                this.queues.merge.add(FINAL_MERGE_MARKER);
                return;
            }
            FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
            while (this.isRunning()) {
                try {
                    element = cache.isEmpty() ? this.queues.spill.take() : (CircularElement)cache.poll();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Spilling thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (!this.isRunning()) {
                    return;
                }
                if (element == EOF_MARKER) break;
                if (element.buffer.getOccupancy() > 0L) {
                    int blockCount;
                    int bytesInLastBuffer;
                    FileIOChannel.ID channel = enumerator.next();
                    BinaryExternalSorter.this.channelManager.addChannel(channel);
                    AbstractChannelWriterOutputView output2 = null;
                    try {
                        BinaryExternalSorter.this.numSpillFiles++;
                        output2 = FileChannelUtil.createOutputView(this.ioManager, channel, BinaryExternalSorter.this.compressionEnable, BinaryExternalSorter.this.compressionCodecFactory, BinaryExternalSorter.this.compressionBlockSize, BinaryExternalSorter.this.memorySegmentSize);
                        element.buffer.writeToOutput(output2);
                        BinaryExternalSorter.this.spillInBytes = BinaryExternalSorter.this.spillInBytes + output2.getNumBytes();
                        BinaryExternalSorter.this.spillInCompressedBytes = BinaryExternalSorter.this.spillInCompressedBytes + output2.getNumCompressedBytes();
                        bytesInLastBuffer = output2.close();
                        blockCount = output2.getBlockCount();
                        LOG.info("here spill the {}th sort buffer data with {} bytes and {} compressed bytes", new Object[]{BinaryExternalSorter.this.numSpillFiles, BinaryExternalSorter.this.spillInBytes, BinaryExternalSorter.this.spillInCompressedBytes});
                    }
                    catch (IOException e2) {
                        if (output2 != null) {
                            output2.closeAndDelete();
                        }
                        throw e2;
                    }
                    this.queues.merge.add(new ChannelWithMeta(channel, blockCount, bytesInLastBuffer));
                }
                element.buffer.reset();
                this.queues.empty.add(element);
            }
            BinaryExternalSorter.this.releaseSortMemory();
            this.queues.merge.add(FINAL_MERGE_MARKER);
        }

        protected final void releaseEmptyBuffers() {
            while (!this.queues.empty.isEmpty()) {
                try {
                    CircularElement element = this.queues.empty.take();
                    element.buffer.dispose();
                }
                catch (InterruptedException iex) {
                    if (!this.isRunning()) break;
                    LOG.error("Spilling thread was interrupted (without being shut down) while collecting empty buffers to release them. Retrying to collect buffers...");
                }
            }
            BinaryExternalSorter.this.releaseCoreSegments();
        }
    }

    protected static class SortingThread
    extends ThreadBase {
        private final IndexedSorter sorter = new QuickSort();

        public SortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues) {
            super(exceptionHandler, "SortMerger sorting thread", queues);
        }

        @Override
        public void go() throws IOException {
            boolean alive = true;
            while (this.isRunning() && alive) {
                CircularElement element;
                try {
                    element = this.queues.sort.take();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        if (!LOG.isErrorEnabled()) continue;
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (element != EOF_MARKER && element != SPILLING_MARKER) {
                    if (element.buffer.size() == 0) {
                        element.buffer.reset();
                        this.queues.empty.add(element);
                        continue;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorting buffer " + element.id + ".");
                    }
                    this.sorter.sort((IndexedSortable)element.buffer);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorted buffer " + element.id + ".");
                    }
                } else if (element == EOF_MARKER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorting thread done.");
                    }
                    alive = false;
                }
                this.queues.spill.add(element);
            }
        }
    }

    protected static abstract class ThreadBase
    extends Thread
    implements Thread.UncaughtExceptionHandler {
        protected final CircularQueues queues;
        private final ExceptionHandler<IOException> exceptionHandler;
        private volatile boolean alive;

        protected ThreadBase(ExceptionHandler<IOException> exceptionHandler, String name, CircularQueues queues) {
            super(name);
            this.setDaemon(true);
            this.exceptionHandler = exceptionHandler;
            this.setUncaughtExceptionHandler(this);
            this.queues = queues;
            this.alive = true;
        }

        @Override
        public void run() {
            try {
                this.go();
            }
            catch (Throwable t) {
                this.internalHandleException(new IOException("Thread '" + this.getName() + "' terminated due to an exception: " + t.getMessage(), t));
            }
        }

        protected abstract void go() throws IOException;

        public boolean isRunning() {
            return this.alive;
        }

        public void shutdown() {
            this.alive = false;
            this.interrupt();
        }

        protected final void internalHandleException(IOException ioex) {
            if (!this.isRunning()) {
                return;
            }
            if (this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.handleException((Throwable)ioex);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }

        @Override
        public void uncaughtException(Thread t, Throwable e2) {
            this.internalHandleException(new IOException("Thread '" + t.getName() + "' terminated due to an uncaught exception: " + e2.getMessage(), e2));
        }
    }

    protected static final class CircularQueues {
        final BlockingQueue<CircularElement> empty = new LinkedBlockingQueue<CircularElement>();
        final BlockingQueue<CircularElement> sort = new LinkedBlockingQueue<CircularElement>();
        final BlockingQueue<CircularElement> spill = new LinkedBlockingQueue<CircularElement>();
        final BlockingQueue<ChannelWithMeta> merge = new LinkedBlockingQueue<ChannelWithMeta>();

        protected CircularQueues() {
        }
    }

    protected static final class CircularElement {
        final int id;
        final BinaryInMemorySortBuffer buffer;
        final List<MemorySegment> memory;

        public CircularElement() {
            this.id = -1;
            this.buffer = null;
            this.memory = null;
        }

        public CircularElement(int id, BinaryInMemorySortBuffer buffer, List<MemorySegment> memory) {
            this.id = id;
            this.buffer = buffer;
            this.memory = memory;
        }
    }
}

