/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.LargeRecordHandler;
import org.apache.flink.runtime.operators.sort.SortedDataFileFactory;
import org.apache.flink.runtime.operators.sort.SortedDataFileMerger;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushedUnilateralSortMerger<E>
extends UnilateralSortMerger<E> {
    private static final Logger LOG = LoggerFactory.getLogger(PushedUnilateralSortMerger.class);
    private boolean firstRecord = true;
    private long bytesUntilSpilling;
    private UnilateralSortMerger.CircularElement<E> currentBuffer;
    private boolean addingDone = false;

    public PushedUnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, SortedDataFileMerger<E> merger, MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, boolean inMemoryResultEnabled, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, boolean objectReuseEnabled, boolean enableAsyncMerging) throws IOException {
        super(sortedDataFileFactory, merger, memoryManager, memory, ioManager, null, parentTask, serializerFactory, comparator, numSortBuffers, maxNumFileHandles, inMemoryResultEnabled, startSpillingFraction, noSpillingMemory, handleLargeRecords, objectReuseEnabled, enableAsyncMerging);
    }

    @Override
    protected UnilateralSortMerger.ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, UnilateralSortMerger.CircularQueues<E> queues, LargeRecordHandler<E> largeRecordHandler, AbstractInvokable parentTask, TypeSerializer<E> serializer, long startSpillingBytes) {
        return null;
    }

    public void add(E current) throws IOException {
        block20: {
            Preconditions.checkArgument((!this.addingDone ? 1 : 0) != 0, (Object)"Adding already done!");
            if (this.unhandledException != null) {
                throw this.unhandledException;
            }
            try {
                UnilateralSortMerger.CircularElement SPILLING_MARKER;
                InMemorySorter buffer;
                if (this.firstRecord) {
                    this.bytesUntilSpilling = this.startSpillingBytes;
                    if (this.bytesUntilSpilling < 1L) {
                        this.bytesUntilSpilling = 0L;
                        this.circularQueues.sort.add(PushedUnilateralSortMerger.spillingMarker());
                    }
                    this.firstRecord = false;
                }
                while (true) {
                    if (this.currentBuffer == null) {
                        try {
                            this.currentBuffer = this.circularQueues.empty.poll(500L, TimeUnit.MILLISECONDS);
                            if (this.unhandledException != null) {
                                throw this.unhandledException;
                            }
                            if (this.currentBuffer == null) continue;
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Retrieved empty read buffer " + this.currentBuffer.id + ".");
                            }
                            if (this.currentBuffer.buffer.isEmpty()) continue;
                            throw new IOException("New buffer is not empty.");
                        }
                        catch (InterruptedException iex) {
                            throw new IOException(iex);
                        }
                    }
                    buffer = this.currentBuffer.buffer;
                    if (buffer.write(current)) break;
                    if (buffer.isEmpty()) {
                        if (this.largeRecordHandler != null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Large record did not fit into a fresh sort buffer. Putting into large record store.");
                            }
                        } else {
                            throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + buffer.getCapacity() + " bytes).");
                        }
                        this.largeRecordHandler.addRecord(current);
                        buffer.reset();
                        break block20;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Emitting full buffer: " + this.currentBuffer.id + ".");
                    }
                    if (this.bytesUntilSpilling > 0L) {
                        this.bytesUntilSpilling -= buffer.getCapacity();
                        if (this.bytesUntilSpilling <= 0L) {
                            this.bytesUntilSpilling = 0L;
                            SPILLING_MARKER = PushedUnilateralSortMerger.spillingMarker();
                            this.circularQueues.sort.add(SPILLING_MARKER);
                        }
                    }
                    this.circularQueues.sort.add(this.currentBuffer);
                    this.currentBuffer = null;
                }
                if (this.bytesUntilSpilling > 0L && this.bytesUntilSpilling - buffer.getOccupancy() <= 0L) {
                    this.bytesUntilSpilling = 0L;
                    SPILLING_MARKER = PushedUnilateralSortMerger.spillingMarker();
                    this.circularQueues.sort.add(SPILLING_MARKER);
                }
            }
            catch (Throwable e) {
                if (this.unhandledException != null) {
                    LOG.warn("Record add failed.", e);
                    throw this.unhandledException;
                }
                throw new IOException(e);
            }
        }
    }

    public void finishAdding() {
        if (!this.addingDone) {
            if (this.currentBuffer != null) {
                this.circularQueues.sort.add(this.currentBuffer);
            }
            UnilateralSortMerger.CircularElement EOF_MARKER = PushedUnilateralSortMerger.endMarker();
            this.circularQueues.sort.add(EOF_MARKER);
            LOG.info("Sending done.");
            this.addingDone = true;
        }
    }
}

