package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/PushedUnilateralSortMerger.class */
public class PushedUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
    private static final Logger LOG = LoggerFactory.getLogger(PushedUnilateralSortMerger.class);
    private boolean firstRecord;
    private long bytesUntilSpilling;
    private UnilateralSortMerger.CircularElement<E> currentBuffer;
    private boolean addingDone;

    public PushedUnilateralSortMerger(SortedDataFileFactory<E> sortedDataFileFactory, SortedDataFileMerger<E> sortedDataFileMerger, MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, AbstractInvokable abstractInvokable, TypeSerializerFactory<E> typeSerializerFactory, TypeComparator<E> typeComparator, int i, int i2, boolean z, float f, boolean z2, boolean z3, boolean z4, boolean z5, boolean z6) throws IOException {
        super(sortedDataFileFactory, sortedDataFileMerger, memoryManager, list, iOManager, null, abstractInvokable, typeSerializerFactory, typeComparator, i, i2, z, f, z2, z3, z4, z5, z6);
        this.firstRecord = true;
        this.addingDone = false;
    }

    @Override // org.apache.flink.runtime.operators.sort.UnilateralSortMerger
    protected UnilateralSortMerger.ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> mutableObjectIterator, UnilateralSortMerger.CircularQueues<E> circularQueues, LargeRecordHandler<E> largeRecordHandler, AbstractInvokable abstractInvokable, TypeSerializer<E> typeSerializer, long j) {
        return null;
    }

    public void add(E e) throws IOException {
        Preconditions.checkArgument(!this.addingDone, "Adding already done!");
        if (this.unhandledException != null) {
            throw this.unhandledException;
        }
        try {
            if (this.firstRecord) {
                this.bytesUntilSpilling = this.startSpillingBytes;
                if (this.bytesUntilSpilling < 1) {
                    this.bytesUntilSpilling = 0L;
                    this.circularQueues.sort.add(spillingMarker());
                }
                this.firstRecord = false;
            }
            while (true) {
                if (this.currentBuffer == null) {
                    try {
                        this.currentBuffer = this.circularQueues.empty.poll(500L, TimeUnit.MILLISECONDS);
                        if (this.unhandledException != null) {
                            throw this.unhandledException;
                        }
                        if (this.currentBuffer != null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Retrieved empty read buffer " + this.currentBuffer.id + ScopeFormat.SCOPE_SEPARATOR);
                            }
                            if (!this.currentBuffer.buffer.isEmpty()) {
                                throw new IOException("New buffer is not empty.");
                            }
                        }
                    } catch (InterruptedException e2) {
                        throw new IOException(e2);
                    }
                } else {
                    InMemorySorter<E> inMemorySorter = this.currentBuffer.buffer;
                    if (inMemorySorter.write(e)) {
                        if (this.bytesUntilSpilling > 0 && this.bytesUntilSpilling - inMemorySorter.getOccupancy() <= 0) {
                            this.bytesUntilSpilling = 0L;
                            this.circularQueues.sort.add(spillingMarker());
                        }
                    } else if (!inMemorySorter.isEmpty()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Emitting full buffer: " + this.currentBuffer.id + ScopeFormat.SCOPE_SEPARATOR);
                        }
                        if (this.bytesUntilSpilling > 0) {
                            this.bytesUntilSpilling -= inMemorySorter.getCapacity();
                            if (this.bytesUntilSpilling <= 0) {
                                this.bytesUntilSpilling = 0L;
                                this.circularQueues.sort.add(spillingMarker());
                            }
                        }
                        this.circularQueues.sort.add(this.currentBuffer);
                        this.currentBuffer = null;
                    } else {
                        if (this.largeRecordHandler == null) {
                            throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + inMemorySorter.getCapacity() + " bytes).");
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Large record did not fit into a fresh sort buffer. Putting into large record store.");
                        }
                        this.largeRecordHandler.addRecord(e);
                        inMemorySorter.reset();
                    }
                }
            }
        } catch (Throwable th) {
            if (this.unhandledException == null) {
                throw new IOException(th);
            }
            LOG.warn("Record add failed.", th);
            throw this.unhandledException;
        }
    }

    public void finishAdding() {
        if (this.addingDone) {
            return;
        }
        if (this.currentBuffer != null) {
            this.circularQueues.sort.add(this.currentBuffer);
        }
        this.circularQueues.sort.add(endMarker());
        LOG.info("Sending done.");
        this.addingDone = true;
    }
}
