package org.apache.flink.runtime.operators.chaining;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.class */
public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(SynchronousChainedCombineDriver.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private InMemorySorter<IN> sorter;
    private GroupCombineFunction<IN, OUT> combiner;
    private TypeSerializer<IN> serializer;
    private TypeComparator<IN> groupingComparator;
    private AbstractInvokable parent;
    private List<MemorySegment> memory;
    private final QuickSort sortAlgo = new QuickSort();
    private volatile boolean running = true;

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void setup(AbstractInvokable abstractInvokable) {
        this.parent = abstractInvokable;
        GroupCombineFunction<IN, OUT> groupCombineFunction = (GroupCombineFunction) BatchTask.instantiateUserCode(this.config, this.userCodeClassLoader, GroupCombineFunction.class);
        this.combiner = groupCombineFunction;
        FunctionUtils.setFunctionRuntimeContext(groupCombineFunction, getUdfRuntimeContext());
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void openTask() throws Exception {
        BatchTask.openUserCode(this.combiner, this.config.getStubParameters());
        TypeSerializerFactory inputSerializer = this.config.getInputSerializer(0, this.userCodeClassLoader);
        TypeComparatorFactory driverComparator = this.config.getDriverComparator(0, this.userCodeClassLoader);
        TypeComparatorFactory driverComparator2 = this.config.getDriverComparator(1, this.userCodeClassLoader);
        this.serializer = inputSerializer.getSerializer();
        TypeComparator createComparator = driverComparator.createComparator();
        this.groupingComparator = driverComparator2.createComparator();
        MemoryManager memoryManager = this.parent.getEnvironment().getMemoryManager();
        this.memory = memoryManager.allocatePages(this.parent, memoryManager.computeNumberOfPages(this.config.getRelativeMemoryDriver()));
        if (!createComparator.supportsSerializationWithKeyNormalization() || this.serializer.getLength() <= 0 || this.serializer.getLength() > 32) {
            this.sorter = new NormalizedKeySorter(this.serializer, createComparator.duplicate(), this.memory);
        } else {
            this.sorter = new FixedLengthRecordSorter(this.serializer, createComparator.duplicate(), this.memory);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("SynchronousChainedCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ScopeFormat.SCOPE_SEPARATOR);
        }
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void closeTask() throws Exception {
        this.sorter.dispose();
        this.parent.getEnvironment().getMemoryManager().release(this.memory);
        if (this.running) {
            BatchTask.closeUserCode(this.combiner);
        }
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void cancelTask() {
        this.running = false;
        try {
            this.sorter.dispose();
        } catch (Exception e) {
        }
        this.parent.getEnvironment().getMemoryManager().release(this.memory);
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    /* renamed from: getStub */
    public Function mo2414getStub() {
        return this.combiner;
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public String getTaskName() {
        return this.taskName;
    }

    @Override // org.apache.flink.runtime.operators.chaining.ChainedDriver
    public void collect(IN in) {
        this.numRecordsIn.inc();
        try {
            if (this.sorter.write(in)) {
                return;
            }
            try {
                sortAndCombine();
                this.sorter.reset();
                try {
                    if (this.sorter.write(in)) {
                    } else {
                        throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
                    }
                } catch (IOException e) {
                    throw new ExceptionInChainedStubException(this.taskName, e);
                }
            } catch (Exception e2) {
                throw new ExceptionInChainedStubException(this.taskName, e2);
            }
        } catch (IOException e3) {
            throw new ExceptionInChainedStubException(this.taskName, e3);
        }
    }

    public void close() {
        try {
            sortAndCombine();
            this.outputCollector.close();
        } catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
    }

    private void sortAndCombine() throws Exception {
        InMemorySorter<IN> inMemorySorter = this.sorter;
        if (this.objectReuseEnabled) {
            if (inMemorySorter.isEmpty()) {
                return;
            }
            this.sortAlgo.sort(inMemorySorter);
            ReusingKeyGroupedIterator reusingKeyGroupedIterator = new ReusingKeyGroupedIterator(inMemorySorter.getIterator(), this.serializer, this.groupingComparator);
            GroupCombineFunction<IN, OUT> groupCombineFunction = this.combiner;
            Collector<OT> collector = this.outputCollector;
            while (this.running && reusingKeyGroupedIterator.nextKey()) {
                groupCombineFunction.combine(reusingKeyGroupedIterator.getValues(), collector);
            }
            return;
        }
        if (inMemorySorter.isEmpty()) {
            return;
        }
        this.sortAlgo.sort(inMemorySorter);
        NonReusingKeyGroupedIterator nonReusingKeyGroupedIterator = new NonReusingKeyGroupedIterator(inMemorySorter.getIterator(), this.groupingComparator);
        GroupCombineFunction<IN, OUT> groupCombineFunction2 = this.combiner;
        Collector<OT> collector2 = this.outputCollector;
        while (this.running && nonReusingKeyGroupedIterator.nextKey()) {
            groupCombineFunction2.combine(nonReusingKeyGroupedIterator.getValues(), collector2);
        }
    }
}
