/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ChannelDeleteRegistry;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.LargeRecordHandler;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.operators.sort.SortedDataFileFactory;
import org.apache.flink.runtime.operators.sort.SortedDataFileMerger;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnilateralSortMerger<E>
implements Sorter<E> {
    private static final Logger LOG = LoggerFactory.getLogger(UnilateralSortMerger.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    protected static final int MIN_NUM_WRITE_BUFFERS = 2;
    protected static final int MAX_NUM_WRITE_BUFFERS = 4;
    protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
    protected static final int MIN_NUM_ASYNC_MERGE_READ_MEM_SEGMENTS = 4;
    protected static final double ASYNC_MERGE_MEMORY_RATIO = 0.1;
    private final ThreadBase<E> readThread;
    private final ThreadBase<E> sortThread;
    private final ThreadBase<E> spillThread;
    private final ThreadBase<E> mergingThread;
    private final SortedDataFileFactory<E> sortedDataFileFactory;
    private final SortedDataFileMerger<E> merger;
    protected final boolean inMemoryResultEnabled;
    protected final List<MemorySegment> sortReadMemory;
    protected final List<MemorySegment> writeMemoryForSpilling;
    protected final Object mergeMemoryLock = new Object();
    @GuardedBy(value="mergeMemoryLock")
    protected final List<MemorySegment> writeMemoryForMerging;
    @GuardedBy(value="mergeMemoryLock")
    protected final List<MemorySegment> readMemoryForMerging;
    protected final MemoryManager memoryManager;
    protected final CircularQueues<E> circularQueues;
    protected final long startSpillingBytes;
    protected final LargeRecordHandler<E> largeRecordHandler;
    protected volatile MutableObjectIterator<E> largeRecords;
    protected final ChannelDeleteRegistry<E> channelDeleteRegistry;
    protected final Object iteratorLock = new Object();
    protected volatile List<SortedDataFile<E>> remainingSortedDataFiles;
    protected volatile MutableObjectIterator<E> iterator;
    protected volatile IOException unhandledException;
    protected volatile boolean closed;
    protected final boolean objectReuseEnabled;
    protected final AtomicBoolean cacheOnly;
    private static final CircularElement<Object> EOF_MARKER = new CircularElement();
    private static final CircularElement<Object> SPILLING_MARKER = new CircularElement();
    private static final SortedDataFileElement<Object> MERGING_MARKER = new SortedDataFileElement();

    public UnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, SortedDataFileMerger<E> merger, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException {
        this(sortedDataFileFactory, merger, memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, -1, maxNumFileHandles, inMemoryResultEnabled, startSpillingFraction, handleLargeRecords, objectReuseEnabled);
    }

    public UnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, SortedDataFileMerger<E> merger, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException {
        this(sortedDataFileFactory, merger, memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, numSortBuffers, maxNumFileHandles, inMemoryResultEnabled, startSpillingFraction, false, handleLargeRecords, objectReuseEnabled);
    }

    public UnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, SortedDataFileMerger<E> merger, MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException {
        this(sortedDataFileFactory, merger, memoryManager, memory, ioManager, input, parentTask, serializerFactory, comparator, numSortBuffers, maxNumFileHandles, inMemoryResultEnabled, startSpillingFraction, false, handleLargeRecords, objectReuseEnabled, false);
    }

    protected UnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, SortedDataFileMerger<E> merger, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException {
        this(sortedDataFileFactory, merger, memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)), ioManager, input, parentTask, serializerFactory, comparator, numSortBuffers, maxNumFileHandles, inMemoryResultEnabled, startSpillingFraction, noSpillingMemory, handleLargeRecords, objectReuseEnabled, false);
    }

    protected UnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, SortedDataFileMerger<E> merger, MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, boolean objectReuseEnabled, boolean enableAsyncMerging) throws IOException {
        int i;
        int i2;
        int numLargeRecordBuffers;
        int numWriteBuffers;
        if (sortedDataFileFactory == null | merger == null | memoryManager == null | (ioManager == null && !noSpillingMemory) | serializerFactory == null | comparator == null) {
            throw new NullPointerException();
        }
        if (parentTask == null) {
            throw new NullPointerException("Parent Task must not be null.");
        }
        if (maxNumFileHandles < 2) {
            throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
        }
        this.sortedDataFileFactory = sortedDataFileFactory;
        this.merger = merger;
        this.memoryManager = memoryManager;
        this.objectReuseEnabled = objectReuseEnabled;
        this.cacheOnly = new AtomicBoolean(false);
        int numPagesTotal = memory.size();
        int minPagesNeeded = 12;
        if (enableAsyncMerging) {
            minPagesNeeded += 6;
        }
        if (numPagesTotal < minPagesNeeded) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least " + minPagesNeeded + " pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
        }
        if (noSpillingMemory && !handleLargeRecords) {
            numWriteBuffers = 0;
            numLargeRecordBuffers = 0;
        } else {
            int numWriteBufferConsumers = (noSpillingMemory ? 0 : 1) + (handleLargeRecords ? 2 : 0) + (enableAsyncMerging ? 1 : 0);
            int minBuffersForMerging = maxNumFileHandles + numWriteBufferConsumers * 2;
            if (minBuffersForMerging > numPagesTotal) {
                numWriteBuffers = noSpillingMemory ? 0 : 2;
                numLargeRecordBuffers = handleLargeRecords ? 4 : 0;
                maxNumFileHandles = numPagesTotal - numWriteBufferConsumers * 2;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reducing maximal merge fan-in to " + maxNumFileHandles + " due to limited memory availability during merge");
                }
            } else {
                int fractionalAuxBuffers = numPagesTotal / (numWriteBufferConsumers * 100);
                if (fractionalAuxBuffers >= 4) {
                    numWriteBuffers = noSpillingMemory ? 0 : 4;
                    numLargeRecordBuffers = handleLargeRecords ? 8 : 0;
                } else {
                    numWriteBuffers = noSpillingMemory ? 0 : Math.max(2, fractionalAuxBuffers);
                    numLargeRecordBuffers = handleLargeRecords ? Math.max(4, fractionalAuxBuffers) : 0;
                }
            }
        }
        int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers;
        int numMergeReadBuffers = 0;
        if (enableAsyncMerging) {
            numMergeReadBuffers = Math.max((int)((double)(sortMemPages -= numWriteBuffers) * 0.1), 4);
            sortMemPages -= numMergeReadBuffers;
        }
        long sortMemory = (long)sortMemPages * (long)memoryManager.getPageSize();
        if (numSortBuffers < 1) {
            numSortBuffers = sortMemory > 0x6400000L ? 2 : 1;
        }
        int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (=%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d buffers for writing sorted results and merging maximally %d streams at once. Using %d memory segments for large record spilling.", sortMemPages, sortMemory, numSortBuffers, numSegmentsPerSortBuffer, numWriteBuffers, maxNumFileHandles, numLargeRecordBuffers));
        }
        this.sortReadMemory = memory;
        this.writeMemoryForSpilling = new ArrayList<MemorySegment>(numWriteBuffers);
        TypeSerializer serializer = serializerFactory.getSerializer();
        for (i2 = 0; i2 < numWriteBuffers; ++i2) {
            this.writeMemoryForSpilling.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
        }
        this.readMemoryForMerging = new ArrayList<MemorySegment>();
        this.writeMemoryForMerging = new ArrayList<MemorySegment>(numWriteBuffers);
        if (enableAsyncMerging) {
            for (i2 = 0; i2 < numWriteBuffers; ++i2) {
                this.writeMemoryForMerging.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
            }
            for (i2 = 0; i2 < numMergeReadBuffers; ++i2) {
                this.readMemoryForMerging.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
            }
        }
        if (numLargeRecordBuffers > 0) {
            ArrayList<MemorySegment> mem = new ArrayList<MemorySegment>();
            for (i = 0; i < numLargeRecordBuffers; ++i) {
                mem.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
            }
            this.largeRecordHandler = new LargeRecordHandler(serializer, comparator.duplicate(), ioManager, memoryManager, mem, parentTask, maxNumFileHandles);
        } else {
            this.largeRecordHandler = null;
        }
        this.circularQueues = new CircularQueues();
        Iterator<MemorySegment> segments = this.sortReadMemory.iterator();
        for (i = 0; i < numSortBuffers; ++i) {
            int k;
            ArrayList<MemorySegment> sortSegments = new ArrayList<MemorySegment>(numSegmentsPerSortBuffer);
            int n = k = i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer;
            while (k > 0 && segments.hasNext()) {
                sortSegments.add(segments.next());
                --k;
            }
            TypeComparator comp = comparator.duplicate();
            InMemorySorter buffer = comp.supportsSerializationWithKeyNormalization() && serializer.getLength() > 0 && serializer.getLength() <= 32 ? new FixedLengthRecordSorter(serializerFactory.getSerializer(), comp, sortSegments) : new NormalizedKeySorter(serializerFactory.getSerializer(), comp, sortSegments);
            CircularElement element = new CircularElement(i, buffer, sortSegments);
            this.circularQueues.empty.add(element);
        }
        ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>(){

            @Override
            public void handleException(IOException exception) {
                if (!UnilateralSortMerger.this.closed) {
                    UnilateralSortMerger.this.setResultException(exception);
                    UnilateralSortMerger.this.close();
                }
            }
        };
        this.channelDeleteRegistry = new ChannelDeleteRegistry();
        this.inMemoryResultEnabled = inMemoryResultEnabled;
        if (!inMemoryResultEnabled) {
            startSpillingFraction = 0.0f;
        }
        LinkedBlockingQueue<SortedDataFileElement<E>> spilledFiles = new LinkedBlockingQueue<SortedDataFileElement<E>>();
        this.startSpillingBytes = (long)(startSpillingFraction * (float)sortMemory);
        this.readThread = this.getReadingThread(exceptionHandler, input, this.circularQueues, this.largeRecordHandler, parentTask, serializer, this.startSpillingBytes);
        this.sortThread = this.getSortingThread(exceptionHandler, this.circularQueues, parentTask);
        this.spillThread = this.getSpillingThread(sortedDataFileFactory, spilledFiles, merger, exceptionHandler, this.circularQueues, parentTask, memoryManager, ioManager, serializerFactory, comparator, this.sortReadMemory, this.writeMemoryForSpilling, maxNumFileHandles);
        this.mergingThread = this.getMergingThread(merger, memoryManager, exceptionHandler, parentTask, spilledFiles);
        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
        if (contextLoader != null) {
            if (this.readThread != null) {
                this.readThread.setContextClassLoader(contextLoader);
            }
            if (this.sortThread != null) {
                this.sortThread.setContextClassLoader(contextLoader);
            }
            if (this.spillThread != null) {
                this.spillThread.setContextClassLoader(contextLoader);
            }
            if (this.mergingThread != null) {
                this.mergingThread.setContextClassLoader(contextLoader);
            }
        }
        this.startThreads();
    }

    protected void startThreads() {
        if (this.readThread != null) {
            this.readThread.start();
        }
        if (this.sortThread != null) {
            this.sortThread.start();
        }
        if (this.spillThread != null) {
            this.spillThread.start();
        }
        if (this.mergingThread != null) {
            this.mergingThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        try {
            object = this.iteratorLock;
            synchronized (object) {
                if (this.unhandledException == null) {
                    this.unhandledException = new IOException("The sorter has been closed.");
                    this.iteratorLock.notifyAll();
                }
            }
            if (this.readThread != null) {
                try {
                    this.readThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down reader thread: " + t.getMessage(), t);
                }
            }
            if (this.sortThread != null) {
                try {
                    this.sortThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down sorter thread: " + t.getMessage(), t);
                }
            }
            if (this.spillThread != null) {
                try {
                    this.spillThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down spilling thread: " + t.getMessage(), t);
                }
            }
            if (this.mergingThread != null) {
                try {
                    this.mergingThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down merging thread: " + t.getMessage(), t);
                }
            }
            try {
                if (this.readThread != null) {
                    this.readThread.join();
                }
                if (this.sortThread != null) {
                    this.sortThread.join();
                }
                if (this.spillThread != null) {
                    this.spillThread.join();
                }
                if (this.mergingThread != null) {
                    this.mergingThread.join();
                }
            }
            catch (InterruptedException iex) {
                LOG.debug("Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.", (Throwable)iex);
            }
        }
        finally {
            try {
                if (!this.writeMemoryForSpilling.isEmpty()) {
                    this.memoryManager.release(this.writeMemoryForSpilling);
                }
                this.writeMemoryForSpilling.clear();
                if (!this.writeMemoryForMerging.isEmpty()) {
                    this.memoryManager.release(this.writeMemoryForMerging);
                }
                this.writeMemoryForMerging.clear();
            }
            catch (Throwable throwable) {}
            try {
                if (!this.sortReadMemory.isEmpty()) {
                    this.memoryManager.release(this.sortReadMemory);
                }
                this.sortReadMemory.clear();
                if (!this.readMemoryForMerging.isEmpty()) {
                    this.memoryManager.release(this.readMemoryForMerging);
                }
                this.readMemoryForMerging.clear();
            }
            catch (Throwable throwable) {}
            this.channelDeleteRegistry.clearOpenFiles();
            this.channelDeleteRegistry.clearFiles();
            try {
                if (this.largeRecordHandler != null) {
                    this.largeRecordHandler.close();
                }
            }
            catch (Throwable throwable) {}
        }
    }

    protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, CircularQueues<E> queues, LargeRecordHandler<E> largeRecordHandler, AbstractInvokable parentTask, TypeSerializer<E> serializer, long startSpillingBytes) {
        return new ReadingThread<Object>(exceptionHandler, reader, queues, largeRecordHandler, serializer.createInstance(), parentTask, startSpillingBytes);
    }

    protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask) {
        return new SortingThread<E>(exceptionHandler, queues, parentTask);
    }

    protected ThreadBase<E> getSpillingThread(SortedDataFileFactory<E> sortedDataFileFactory, BlockingQueue<SortedDataFileElement<E>> spilledFiles, SortedDataFileMerger<E> merger, ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles) {
        return new SpillingThread(sortedDataFileFactory, spilledFiles, merger, exceptionHandler, queues, parentTask, memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, maxFileHandles);
    }

    protected ThreadBase<E> getMergingThread(SortedDataFileMerger<E> merger, MemoryManager memoryManager, ExceptionHandler<IOException> exceptionHandler, AbstractInvokable parentTask, BlockingQueue<SortedDataFileElement<E>> spilledFiles) {
        return new MergingThread(merger, memoryManager, spilledFiles, exceptionHandler, parentTask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<SortedDataFile<E>> getRemainingSortedDataFiles() throws InterruptedException {
        Object object = this.iteratorLock;
        synchronized (object) {
            while (this.remainingSortedDataFiles == null && this.unhandledException == null) {
                this.iteratorLock.wait();
            }
            if (this.unhandledException != null) {
                throw new RuntimeException("Error obtaining the sorted input: " + this.unhandledException.getMessage(), this.unhandledException);
            }
            return this.remainingSortedDataFiles;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MutableObjectIterator<E> getIterator() throws InterruptedException {
        Object object = this.iteratorLock;
        synchronized (object) {
            while (this.iterator == null && this.unhandledException == null) {
                this.iteratorLock.wait();
            }
            if (this.unhandledException != null) {
                throw new RuntimeException("Error obtaining the sorted input: " + this.unhandledException.getMessage(), this.unhandledException);
            }
            return this.iterator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResult(List<SortedDataFile<E>> mergedDataFiles, MutableObjectIterator<E> iterator) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.unhandledException == null) {
                this.remainingSortedDataFiles = mergedDataFiles;
                this.iterator = iterator;
                this.iteratorLock.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResultException(IOException ioex) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.unhandledException == null) {
                this.unhandledException = ioex;
                this.iteratorLock.notifyAll();
                LOG.error("Unhandled exception occurs.", (Throwable)this.unhandledException);
            }
        }
    }

    protected static <T> CircularElement<T> endMarker() {
        CircularElement<Object> c = EOF_MARKER;
        return c;
    }

    protected static <T> CircularElement<T> spillingMarker() {
        CircularElement<Object> c = SPILLING_MARKER;
        return c;
    }

    protected static <T> SortedDataFileElement<T> mergingMarker() {
        SortedDataFileElement<Object> s = MERGING_MARKER;
        return s;
    }

    protected class MergingThread
    extends ThreadBase<E> {
        private final SortedDataFileMerger<E> merger;
        private final MemoryManager memManager;
        private final BlockingQueue<SortedDataFileElement<E>> spilledFiles;

        public MergingThread(SortedDataFileMerger<E> merger, MemoryManager memManager, BlockingQueue<SortedDataFileElement<E>> spilledFiles, ExceptionHandler<IOException> exceptionHandler, AbstractInvokable parentTask) {
            super(exceptionHandler, "SortMerger merging thread", null, parentTask);
            this.merger = merger;
            this.memManager = memManager;
            this.spilledFiles = spilledFiles;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void go() throws IOException {
            Object writeMemory;
            ArrayList sortedDataFiles = new ArrayList();
            while (this.isRunning()) {
                try {
                    SortedDataFileElement sortedDataFile = this.spilledFiles.poll(200L, TimeUnit.MILLISECONDS);
                    if (sortedDataFile == null) continue;
                    writeMemory = new ArrayList<MemorySegment>();
                    ArrayList<MemorySegment> mergeReadMemory = new ArrayList<MemorySegment>();
                    Object object = UnilateralSortMerger.this.mergeMemoryLock;
                    synchronized (object) {
                        writeMemory.addAll(UnilateralSortMerger.this.writeMemoryForMerging);
                        mergeReadMemory.addAll(UnilateralSortMerger.this.readMemoryForMerging);
                    }
                    if (sortedDataFile == MERGING_MARKER) {
                        sortedDataFiles = this.merger.finishMerging((List<MemorySegment>)writeMemory, mergeReadMemory, UnilateralSortMerger.this.channelDeleteRegistry, this.getRunningFlag());
                        LOG.info("Finish merging.");
                        break;
                    }
                    if (sortedDataFile == null) continue;
                    this.merger.notifyNewSortedDataFile(sortedDataFile.sortedDataFile, (List<MemorySegment>)writeMemory, mergeReadMemory, UnilateralSortMerger.this.channelDeleteRegistry, this.getRunningFlag());
                    LOG.info("Notified new file {}.", (Object)sortedDataFile.sortedDataFile.getChannelID().getPath());
                }
                catch (InterruptedException e) {
                    LOG.warn("Interrupted when polling spilled files.", (Throwable)e);
                }
            }
            Object e = UnilateralSortMerger.this.mergeMemoryLock;
            synchronized (e) {
                this.memManager.release(UnilateralSortMerger.this.writeMemoryForMerging);
                UnilateralSortMerger.this.writeMemoryForMerging.clear();
            }
            if (UnilateralSortMerger.this.cacheOnly.get()) {
                return;
            }
            if (sortedDataFiles.isEmpty()) {
                if (!UnilateralSortMerger.this.inMemoryResultEnabled || UnilateralSortMerger.this.largeRecords == null) {
                    UnilateralSortMerger.this.setResult(sortedDataFiles, EmptyMutableObjectIterator.get());
                } else {
                    UnilateralSortMerger.this.setResult(sortedDataFiles, UnilateralSortMerger.this.largeRecords);
                }
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Beginning final merge.");
                }
                ArrayList<MemorySegment> mergeReadMemory = new ArrayList<MemorySegment>();
                writeMemory = UnilateralSortMerger.this.mergeMemoryLock;
                synchronized (writeMemory) {
                    mergeReadMemory.addAll(UnilateralSortMerger.this.readMemoryForMerging);
                }
                MutableObjectIterator finalResultIterator = this.merger.getMergingIterator(sortedDataFiles, mergeReadMemory, UnilateralSortMerger.this.largeRecords, UnilateralSortMerger.this.channelDeleteRegistry);
                UnilateralSortMerger.this.setResult(sortedDataFiles, finalResultIterator);
            }
            LOG.info("Merging thread done.");
        }
    }

    protected class SpillingThread
    extends ThreadBase<E> {
        protected final SortedDataFileFactory<E> sortedDataFileFactory;
        protected final SortedDataFileMerger<E> merger;
        protected final MemoryManager memManager;
        protected final IOManager ioManager;
        protected final TypeSerializer<E> serializer;
        protected final TypeComparator<E> comparator;
        protected final List<MemorySegment> writeMemory;
        protected final List<MemorySegment> mergeReadMemory;
        protected final BlockingQueue<SortedDataFileElement<E>> spilledFiles;
        protected final int mergeFactor;
        protected final int numWriteBuffersToCluster;
        protected int numSpilledFiles;

        public SpillingThread(SortedDataFileFactory<E> sortedDataFileFactory, BlockingQueue<SortedDataFileElement<E>> spilledFiles, SortedDataFileMerger<E> merger, ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager, TypeSerializer<E> serializer, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles) {
            super(exceptionHandler, "SortMerger spilling thread", queues, parentTask);
            this.sortedDataFileFactory = (SortedDataFileFactory)Preconditions.checkNotNull(sortedDataFileFactory);
            this.spilledFiles = (BlockingQueue)Preconditions.checkNotNull(spilledFiles);
            this.merger = (SortedDataFileMerger)Preconditions.checkNotNull(merger);
            this.memManager = (MemoryManager)Preconditions.checkNotNull((Object)memManager);
            this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
            this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer);
            this.comparator = (TypeComparator)Preconditions.checkNotNull(comparator);
            this.mergeReadMemory = (List)Preconditions.checkNotNull(sortReadMemory);
            this.writeMemory = (List)Preconditions.checkNotNull(writeMemory);
            this.mergeFactor = maxNumFileHandles;
            this.numWriteBuffersToCluster = writeMemory.size() >= 4 ? writeMemory.size() / 2 : 1;
            this.numSpilledFiles = 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void go() throws IOException {
            CircularElement element;
            ArrayDeque cache = new ArrayDeque();
            while (this.isRunning()) {
                try {
                    element = this.queues.spill.take();
                }
                catch (InterruptedException iex) {
                    throw new IOException("The spilling thread was interrupted.");
                }
                if (element == SPILLING_MARKER) break;
                if (element == EOF_MARKER) {
                    UnilateralSortMerger.this.cacheOnly.set(true);
                    break;
                }
                cache.add(element);
            }
            if (!this.isRunning()) {
                return;
            }
            if (UnilateralSortMerger.this.cacheOnly.get() && UnilateralSortMerger.this.largeRecordHandler != null && UnilateralSortMerger.this.largeRecordHandler.hasData()) {
                Object circElement;
                ArrayList<MemorySegment> memoryForLargeRecordSorting = new ArrayList<MemorySegment>();
                while ((circElement = (CircularElement)this.queues.empty.poll()) != null) {
                    ((CircularElement)circElement).buffer.dispose();
                    memoryForLargeRecordSorting.addAll(((CircularElement)circElement).memory);
                }
                if (memoryForLargeRecordSorting.isEmpty()) {
                    UnilateralSortMerger.this.cacheOnly.set(false);
                    LOG.debug("Going to disk-based merge because of large records.");
                } else {
                    LOG.debug("Sorting large records, to add them to in-memory merge.");
                    UnilateralSortMerger.this.largeRecords = UnilateralSortMerger.this.largeRecordHandler.finishWriteAndSortKeys(memoryForLargeRecordSorting);
                }
            }
            if (UnilateralSortMerger.this.cacheOnly.get()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Initiating in memory merge.");
                }
                ArrayList iterators = new ArrayList(cache.size() + 1);
                for (CircularElement circularElement : cache) {
                    iterators.add(circularElement.buffer.getIterator());
                }
                if (UnilateralSortMerger.this.largeRecords != null) {
                    iterators.add(UnilateralSortMerger.this.largeRecords);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Releasing unused sort-buffer memory.");
                }
                this.disposeSortBuffers(true);
                UnilateralSortMerger.this.setResult(new ArrayList(), iterators.isEmpty() ? EmptyMutableObjectIterator.get() : (iterators.size() == 1 ? (MutableObjectIterator)iterators.get(0) : new MergeIterator(iterators, this.comparator)));
                this.spilledFiles.add(UnilateralSortMerger.mergingMarker());
                return;
            }
            while (this.isRunning()) {
                try {
                    element = this.takeNext(this.queues.spill, cache);
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (!this.isRunning()) {
                    return;
                }
                if (element == EOF_MARKER) break;
                SortedDataFile output = this.sortedDataFileFactory.createFile(this.writeMemory);
                UnilateralSortMerger.this.channelDeleteRegistry.registerChannelToBeDelete(output.getWriteChannel().getChannelID());
                UnilateralSortMerger.this.channelDeleteRegistry.registerOpenChannel(output.getWriteChannel());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Spilling buffer " + element.id + ".");
                }
                element.buffer.writeToOutput(output, UnilateralSortMerger.this.largeRecordHandler);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Spilled buffer " + element.id + ".");
                }
                output.finishWriting();
                UnilateralSortMerger.this.channelDeleteRegistry.unregisterOpenChannel(output.getWriteChannel());
                if (output.getBytesWritten() > 0L) {
                    this.spilledFiles.add(new SortedDataFileElement(output));
                    ++this.numSpilledFiles;
                }
                element.buffer.reset();
                this.queues.empty.add(element);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Spilling done.");
                LOG.debug("Releasing sort-buffer memory.");
            }
            this.disposeSortBuffers(false);
            List<MemorySegment> mergeReadMemory = null;
            List<MemorySegment> longRecMem = null;
            if (UnilateralSortMerger.this.largeRecordHandler != null && UnilateralSortMerger.this.largeRecordHandler.hasData()) {
                if (this.numSpilledFiles == 0) {
                    longRecMem = this.mergeReadMemory;
                    mergeReadMemory = new ArrayList<MemorySegment>();
                } else {
                    int i;
                    int n = Math.min(this.mergeFactor, this.numSpilledFiles);
                    int pagesPerStream = Math.max(2, Math.min(4, this.mergeReadMemory.size() / 2 / n));
                    int totalMergeReadMemory = n * pagesPerStream;
                    mergeReadMemory = new ArrayList<MemorySegment>(totalMergeReadMemory);
                    for (i = 0; i < totalMergeReadMemory; ++i) {
                        mergeReadMemory.add(this.mergeReadMemory.get(i));
                    }
                    longRecMem = new ArrayList<MemorySegment>();
                    for (i = totalMergeReadMemory; i < this.mergeReadMemory.size(); ++i) {
                        longRecMem.add(this.mergeReadMemory.get(i));
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sorting keys for large records.");
                }
                UnilateralSortMerger.this.largeRecords = UnilateralSortMerger.this.largeRecordHandler.finishWriteAndSortKeys(longRecMem);
            } else {
                mergeReadMemory = this.mergeReadMemory;
            }
            if (UnilateralSortMerger.this.largeRecords != null && !UnilateralSortMerger.this.inMemoryResultEnabled) {
                SortedDataFile sortedDataFile = this.sortedDataFileFactory.createFile(this.writeMemory);
                if (!UnilateralSortMerger.this.objectReuseEnabled) {
                    Object rec;
                    while ((rec = UnilateralSortMerger.this.largeRecords.next()) != null) {
                        sortedDataFile.writeRecord(rec);
                    }
                } else {
                    Object rec = this.serializer.createInstance();
                    while ((rec = UnilateralSortMerger.this.largeRecords.next(rec)) != null) {
                        sortedDataFile.writeRecord(rec);
                    }
                }
                sortedDataFile.finishWriting();
                this.spilledFiles.add(new SortedDataFileElement(sortedDataFile));
                UnilateralSortMerger.this.largeRecords = null;
            }
            Object object = UnilateralSortMerger.this.mergeMemoryLock;
            synchronized (object) {
                UnilateralSortMerger.this.readMemoryForMerging.addAll(mergeReadMemory);
                UnilateralSortMerger.this.writeMemoryForMerging.addAll(this.writeMemory);
                this.writeMemory.clear();
                mergeReadMemory.clear();
                if (longRecMem != null && !UnilateralSortMerger.this.inMemoryResultEnabled) {
                    mergeReadMemory.addAll(longRecMem);
                    longRecMem.clear();
                }
            }
            this.spilledFiles.add(UnilateralSortMerger.mergingMarker());
            LOG.info("Spilling thread done.");
        }

        protected final void disposeSortBuffers(boolean releaseMemory) {
            while (!this.queues.empty.isEmpty()) {
                try {
                    CircularElement element = this.queues.empty.take();
                    element.buffer.dispose();
                    if (!releaseMemory) continue;
                    this.memManager.release(element.memory);
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Spilling thread was interrupted (without being shut down) while collecting empty buffers to release them. Retrying to collect buffers...");
                        continue;
                    }
                    return;
                }
            }
        }

        protected final CircularElement<E> takeNext(BlockingQueue<CircularElement<E>> queue, Queue<CircularElement<E>> cache) throws InterruptedException {
            return cache.isEmpty() ? queue.take() : cache.poll();
        }
    }

    protected static class SortingThread<E>
    extends ThreadBase<E> {
        private final IndexedSorter sorter = new QuickSort();

        public SortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask) {
            super(exceptionHandler, "SortMerger sorting thread", queues, parentTask);
        }

        @Override
        public void go() throws IOException {
            boolean alive = true;
            while (this.isRunning() && alive) {
                CircularElement element = null;
                try {
                    element = this.queues.sort.take();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        if (!LOG.isErrorEnabled()) continue;
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (element != EOF_MARKER && element != SPILLING_MARKER) {
                    if (element.buffer.size() == 0) {
                        element.buffer.reset();
                        this.queues.empty.add(element);
                        continue;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorting buffer " + element.id + ".");
                    }
                    this.sorter.sort(element.buffer);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorted buffer " + element.id + ".");
                    }
                } else if (element == EOF_MARKER) {
                    LOG.info("Sorting thread done.");
                    alive = false;
                }
                this.queues.spill.add(element);
            }
        }
    }

    protected static class ReadingThread<E>
    extends ThreadBase<E> {
        private final MutableObjectIterator<E> reader;
        private final LargeRecordHandler<E> largeRecords;
        private final long startSpillingBytes;
        private final E readTarget;

        public ReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, CircularQueues<E> queues, LargeRecordHandler<E> largeRecordsHandler, E readTarget, AbstractInvokable parentTask, long startSpillingBytes) {
            super(exceptionHandler, "SortMerger Reading Thread", queues, parentTask);
            this.reader = reader;
            this.readTarget = readTarget;
            this.startSpillingBytes = startSpillingBytes;
            this.largeRecords = largeRecordsHandler;
        }

        @Override
        public void go() throws IOException {
            MutableObjectIterator<E> reader = this.reader;
            Object current = this.readTarget;
            Object leftoverRecord = null;
            CircularElement element = null;
            long bytesUntilSpilling = this.startSpillingBytes;
            boolean done = false;
            if (bytesUntilSpilling < 1L) {
                bytesUntilSpilling = 0L;
                this.queues.sort.add(UnilateralSortMerger.spillingMarker());
            }
            while (!done && this.isRunning()) {
                while (element == null) {
                    try {
                        element = this.queues.empty.take();
                    }
                    catch (InterruptedException iex) {
                        throw new IOException(iex);
                    }
                }
                InMemorySorter buffer = element.buffer;
                if (!buffer.isEmpty()) {
                    throw new IOException("New buffer is not empty.");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Retrieved empty read buffer " + element.id + ".");
                }
                if (leftoverRecord != null) {
                    if (!buffer.write(leftoverRecord)) {
                        if (this.largeRecords != null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Large record did not fit into a fresh sort buffer. Putting into large record store.");
                            }
                        } else {
                            throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + buffer.getCapacity() + " bytes).");
                        }
                        this.largeRecords.addRecord(leftoverRecord);
                        buffer.reset();
                    }
                    leftoverRecord = null;
                }
                boolean available = true;
                if (bytesUntilSpilling > 0L && buffer.getCapacity() >= bytesUntilSpilling) {
                    CircularElement SPILLING_MARKER;
                    Object newCurrent;
                    boolean fullBuffer = false;
                    while (this.isRunning() && (available = (newCurrent = reader.next(current)) != null)) {
                        current = newCurrent;
                        if (!buffer.write(current)) {
                            leftoverRecord = current;
                            fullBuffer = true;
                            break;
                        }
                        if (bytesUntilSpilling - buffer.getOccupancy() > 0L) continue;
                        bytesUntilSpilling = 0L;
                        SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                        this.queues.sort.add(SPILLING_MARKER);
                        break;
                    }
                    if (fullBuffer) {
                        if (bytesUntilSpilling > 0L && (bytesUntilSpilling -= buffer.getCapacity()) <= 0L) {
                            bytesUntilSpilling = 0L;
                            SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                            this.queues.sort.add(SPILLING_MARKER);
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Emitting full buffer from reader thread: " + element.id + ".");
                        }
                        this.queues.sort.add(element);
                        element = null;
                        continue;
                    }
                } else if (bytesUntilSpilling > 0L && (bytesUntilSpilling -= buffer.getCapacity()) <= 0L) {
                    bytesUntilSpilling = 0L;
                    CircularElement SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                    this.queues.sort.add(SPILLING_MARKER);
                }
                if (available) {
                    Object newCurrent;
                    while (this.isRunning() && (newCurrent = reader.next(current)) != null) {
                        current = newCurrent;
                        if (buffer.write(current)) continue;
                        leftoverRecord = current;
                        break;
                    }
                }
                if (leftoverRecord != null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Emitting full buffer from reader thread: " + element.id + ".");
                    }
                } else {
                    done = true;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Emitting final buffer from reader thread: " + element.id + ".");
                    }
                }
                if (!buffer.isEmpty()) {
                    this.queues.sort.add(element);
                } else {
                    buffer.reset();
                    this.queues.empty.add(element);
                }
                element = null;
            }
            if (!this.isRunning()) {
                return;
            }
            CircularElement EOF_MARKER = UnilateralSortMerger.endMarker();
            this.queues.sort.add(EOF_MARKER);
            LOG.info("Reading thread done.");
        }
    }

    protected static abstract class ThreadBase<E>
    extends Thread
    implements Thread.UncaughtExceptionHandler {
        protected final CircularQueues<E> queues;
        private final ExceptionHandler<IOException> exceptionHandler;
        private final AtomicBoolean alive;

        protected ThreadBase(ExceptionHandler<IOException> exceptionHandler, String name, CircularQueues<E> queues, AbstractInvokable parentTask) {
            super(name);
            this.setDaemon(true);
            this.exceptionHandler = exceptionHandler;
            this.setUncaughtExceptionHandler(this);
            this.queues = queues;
            this.alive = new AtomicBoolean(true);
        }

        @Override
        public void run() {
            try {
                this.go();
            }
            catch (Throwable t) {
                this.internalHandleException(new IOException("Thread '" + this.getName() + "' terminated due to an exception: " + t.getMessage(), t));
            }
        }

        protected abstract void go() throws IOException;

        public boolean isRunning() {
            return this.alive.get();
        }

        public AtomicBoolean getRunningFlag() {
            return this.alive;
        }

        public void shutdown() {
            this.alive.set(false);
            this.interrupt();
        }

        protected final void internalHandleException(IOException ioex) {
            if (!this.isRunning()) {
                return;
            }
            if (this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.handleException(ioex);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.internalHandleException(new IOException("Thread '" + t.getName() + "' terminated due to an uncaught exception: " + e.getMessage(), e));
        }
    }

    protected static final class CircularQueues<E> {
        final BlockingQueue<CircularElement<E>> empty;
        final BlockingQueue<CircularElement<E>> sort;
        final BlockingQueue<CircularElement<E>> spill;

        public CircularQueues() {
            this.empty = new LinkedBlockingQueue<CircularElement<E>>();
            this.sort = new LinkedBlockingQueue<CircularElement<E>>();
            this.spill = new LinkedBlockingQueue<CircularElement<E>>();
        }

        public CircularQueues(int numElements) {
            this.empty = new ArrayBlockingQueue<CircularElement<E>>(numElements);
            this.sort = new ArrayBlockingQueue<CircularElement<E>>(numElements);
            this.spill = new ArrayBlockingQueue<CircularElement<E>>(numElements);
        }
    }

    protected static final class CircularElement<E> {
        final int id;
        final InMemorySorter<E> buffer;
        final List<MemorySegment> memory;

        public CircularElement() {
            this.id = -1;
            this.buffer = null;
            this.memory = null;
        }

        public CircularElement(int id, InMemorySorter<E> buffer, List<MemorySegment> memory) {
            this.id = id;
            this.buffer = buffer;
            this.memory = memory;
        }
    }

    protected static final class SortedDataFileElement<E> {
        final SortedDataFile<E> sortedDataFile;

        public SortedDataFileElement() {
            this.sortedDataFile = null;
        }

        public SortedDataFileElement(SortedDataFile<E> sortedDataFile) {
            this.sortedDataFile = sortedDataFile;
        }
    }
}

