/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.CombiningRecordComparisonMerger;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.operators.sort.SortedDataFileFactory;
import org.apache.flink.runtime.operators.sort.SortedDataFileMerger;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.sort.WriterCollector;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TraversableOnceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CombiningUnilateralSortMerger<E>
extends UnilateralSortMerger<E> {
    private static final Logger LOG = LoggerFactory.getLogger(CombiningUnilateralSortMerger.class);
    private final GroupCombineFunction<E, E> combineStub;
    private Configuration udfConfig;

    public CombiningUnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException {
        this(sortedDataFileFactory, combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, -1, maxNumFileHandles, inMemoryResultEnabled, startSpillingFraction, handleLargeRecords, objectReuseEnabled);
    }

    public CombiningUnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException {
        super(sortedDataFileFactory, new CombiningRecordComparisonMerger<E>(combineStub, sortedDataFileFactory, ioManager, serializerFactory.getSerializer(), comparator, maxNumFileHandles, objectReuseEnabled), memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, numSortBuffers, maxNumFileHandles, inMemoryResultEnabled, startSpillingFraction, false, handleLargeRecords, objectReuseEnabled);
        this.combineStub = combineStub;
    }

    public void setUdfConfiguration(Configuration config) {
        this.udfConfig = config;
    }

    @Override
    protected UnilateralSortMerger.ThreadBase<E> getSpillingThread(SortedDataFileFactory<E> sortedDataFileFactory, BlockingQueue<UnilateralSortMerger.SortedDataFileElement<E>> spilledFiles, SortedDataFileMerger<E> merger, ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles) {
        return new CombiningSpillingThread(sortedDataFileFactory, spilledFiles, merger, exceptionHandler, queues, parentTask, memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, maxFileHandles, this.objectReuseEnabled);
    }

    private static final class CombineValueIterator<E>
    implements Iterator<E>,
    Iterable<E> {
        private final InMemorySorter<E> buffer;
        private E recordReuse;
        private final boolean objectReuseEnabled;
        private int last;
        private int position;
        private boolean iteratorAvailable;

        public CombineValueIterator(InMemorySorter<E> buffer, E instance, boolean objectReuseEnabled) {
            this.buffer = buffer;
            this.recordReuse = instance;
            this.objectReuseEnabled = objectReuseEnabled;
        }

        public void set(int first, int last) {
            this.last = last;
            this.position = first;
            this.iteratorAvailable = true;
        }

        @Override
        public boolean hasNext() {
            return this.position <= this.last;
        }

        @Override
        public E next() {
            if (this.position <= this.last) {
                try {
                    E record = this.objectReuseEnabled ? this.buffer.getRecord(this.recordReuse, this.position) : this.buffer.getRecord(this.position);
                    ++this.position;
                    return record;
                }
                catch (IOException ioex) {
                    LOG.error("Error retrieving a value from a buffer.", (Throwable)ioex);
                    throw new RuntimeException("Could not load the next value: " + ioex.getMessage(), ioex);
                }
            }
            throw new NoSuchElementException();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public Iterator<E> iterator() {
            if (this.iteratorAvailable) {
                this.iteratorAvailable = false;
                return this;
            }
            throw new TraversableOnceException();
        }
    }

    protected class CombiningSpillingThread
    extends UnilateralSortMerger.SpillingThread {
        private final TypeComparator<E> comparator2;
        private final boolean objectReuseEnabled;

        public CombiningSpillingThread(SortedDataFileFactory<E> sortedDataFileFactory, BlockingQueue<UnilateralSortMerger.SortedDataFileElement<E>> spilledFiles, SortedDataFileMerger<E> merger, ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager, TypeSerializer<E> serializer, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles, boolean objectReuseEnabled) {
            super(CombiningUnilateralSortMerger.this, sortedDataFileFactory, spilledFiles, merger, exceptionHandler, queues, parentTask, memManager, ioManager, serializer, comparator, sortReadMemory, writeMemory, maxNumFileHandles);
            this.comparator2 = comparator.duplicate();
            this.objectReuseEnabled = objectReuseEnabled;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void go() throws IOException {
            UnilateralSortMerger.CircularElement element;
            ArrayDeque cache = new ArrayDeque();
            while (this.isRunning()) {
                try {
                    element = this.queues.spill.take();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (element == UnilateralSortMerger.spillingMarker()) break;
                if (element == UnilateralSortMerger.endMarker()) {
                    CombiningUnilateralSortMerger.this.cacheOnly.set(true);
                    break;
                }
                cache.add(element);
            }
            if (!this.isRunning()) {
                return;
            }
            if (CombiningUnilateralSortMerger.this.cacheOnly.get()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Initiating in memory merge.");
                }
                ArrayList iterators = new ArrayList(cache.size());
                for (UnilateralSortMerger.CircularElement circularElement : cache) {
                    iterators.add(circularElement.buffer.getIterator());
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Releasing unused sort-buffer memory.");
                }
                this.disposeSortBuffers(true);
                MergeIterator resIter = iterators.isEmpty() ? EmptyMutableObjectIterator.get() : (iterators.size() == 1 ? (MutableObjectIterator)iterators.get(0) : new MergeIterator(iterators, this.comparator));
                CombiningUnilateralSortMerger.this.setResult(new ArrayList(), resIter);
                this.spilledFiles.add(UnilateralSortMerger.mergingMarker());
                return;
            }
            GroupCombineFunction combineStub = CombiningUnilateralSortMerger.this.combineStub;
            try {
                Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
                FunctionUtils.openFunction((Function)combineStub, (Configuration)(conf == null ? new Configuration() : conf));
            }
            catch (Throwable t) {
                throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
            }
            while (this.isRunning()) {
                int i;
                try {
                    element = this.takeNext(this.queues.spill, cache);
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (!this.isRunning()) {
                    return;
                }
                if (element == UnilateralSortMerger.endMarker()) break;
                SortedDataFile output = this.sortedDataFileFactory.createFile(this.writeMemory);
                CombiningUnilateralSortMerger.this.channelDeleteRegistry.registerChannelToBeDelete(output.getChannelID());
                CombiningUnilateralSortMerger.this.channelDeleteRegistry.registerOpenChannel(output.getWriteChannel());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Creating temp file " + output.getChannelID().toString() + '.');
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Combining buffer " + element.id + '.');
                }
                InMemorySorter inMemorySorter = element.buffer;
                CombineValueIterator<Object> iter = new CombineValueIterator<Object>(inMemorySorter, this.serializer.createInstance(), this.objectReuseEnabled);
                WriterCollector collector = new WriterCollector(output);
                int stop2 = inMemorySorter.size() - 1;
                try {
                    for (i = 0; i < stop2; ++i) {
                        int seqStart = i;
                        while (i < stop2 && 0 == inMemorySorter.compare(i, i + 1)) {
                            ++i;
                        }
                        if (i == seqStart) {
                            inMemorySorter.writeToOutput(output, seqStart, 1);
                            continue;
                        }
                        iter.set(seqStart, i);
                        combineStub.combine(iter, collector);
                    }
                }
                catch (Exception ex) {
                    throw new IOException("An error occurred in the combiner user code.", ex);
                }
                if (i == stop2) {
                    inMemorySorter.writeToOutput(output, stop2, 1);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Combined and spilled buffer " + element.id + ".");
                }
                output.finishWriting();
                CombiningUnilateralSortMerger.this.channelDeleteRegistry.unregisterOpenChannel(output.getWriteChannel());
                this.spilledFiles.add(new UnilateralSortMerger.SortedDataFileElement(output));
                element.buffer.reset();
                this.queues.empty.add(element);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Spilling done.");
                LOG.debug("Releasing sort-buffer memory.");
            }
            this.disposeSortBuffers(false);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing combiner user code.");
            }
            try {
                FunctionUtils.closeFunction((Function)combineStub);
            }
            catch (Throwable t) {
                throw new IOException("The user-defined combiner failed in its 'close()' method.", t);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("User code closed.");
            }
            Object object = CombiningUnilateralSortMerger.this.mergeMemoryLock;
            synchronized (object) {
                CombiningUnilateralSortMerger.this.readMemoryForMerging.addAll(this.mergeReadMemory);
                CombiningUnilateralSortMerger.this.writeMemoryForMerging.addAll(this.writeMemory);
                this.writeMemory.clear();
                this.mergeReadMemory.clear();
            }
            CombiningUnilateralSortMerger.this.largeRecords = null;
            this.spilledFiles.add(UnilateralSortMerger.mergingMarker());
            LOG.info("Spilling thread done.");
        }
    }
}

