package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.operators.sort.ChannelDeleteRegistry;
import org.apache.flink.runtime.operators.sort.DataFileInfo;
import org.apache.flink.runtime.operators.sort.DefaultFileMergePolicy;
import org.apache.flink.runtime.operators.sort.MergePolicy;
import org.apache.flink.runtime.operators.sort.PartialOrderPriorityQueue;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.operators.sort.SortedDataFileMerger;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/writer/ConcatPartitionedFileMerger.class */
public class ConcatPartitionedFileMerger<T> implements SortedDataFileMerger<Tuple2<Integer, T>> {
    private static final Logger LOG;
    private final int numberOfSubpartitions;
    private final String partitionDataRootPath;
    private final IOManager ioManager;
    private final MergePolicy<SortedDataFile<Tuple2<Integer, T>>> mergePolicy;
    private int mergeFileIndex = Integer.MAX_VALUE;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/writer/ConcatPartitionedFileMerger$PartitionIndexStream.class */
    public static final class PartitionIndexStream {
        private final AsynchronousPartitionedStreamFileReaderDelegate reader;
        private final List<PartitionIndex> partitionIndices;
        private int offset = 0;

        public PartitionIndexStream(AsynchronousPartitionedStreamFileReaderDelegate asynchronousPartitionedStreamFileReaderDelegate, List<PartitionIndex> list) {
            this.reader = asynchronousPartitionedStreamFileReaderDelegate;
            this.partitionIndices = list;
        }

        public PartitionIndex getCurrentPartitionIndex() {
            return this.partitionIndices.get(this.offset);
        }

        public AsynchronousPartitionedStreamFileReaderDelegate getReader() {
            return this.reader;
        }

        public boolean advance() {
            if (this.offset >= this.partitionIndices.size() - 1) {
                return false;
            }
            this.offset++;
            return true;
        }

        public String toString() {
            return "PartitionIndexStream{partitionIndices=" + this.partitionIndices.size() + ", offset=" + this.offset + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/writer/ConcatPartitionedFileMerger$PartitionIndexStreamComparator.class */
    public static final class PartitionIndexStreamComparator implements Comparator<PartitionIndexStream> {
        private PartitionIndexStreamComparator() {
        }

        @Override // java.util.Comparator
        public int compare(PartitionIndexStream partitionIndexStream, PartitionIndexStream partitionIndexStream2) {
            int partition = partitionIndexStream.getCurrentPartitionIndex().getPartition();
            int partition2 = partitionIndexStream2.getCurrentPartitionIndex().getPartition();
            return partition != partition2 ? partition < partition2 ? -1 : 1 : Long.compare(partitionIndexStream.getCurrentPartitionIndex().getStartOffset(), partitionIndexStream2.getCurrentPartitionIndex().getStartOffset());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcatPartitionedFileMerger(int i, String str, int i2, boolean z, boolean z2, IOManager iOManager) {
        Preconditions.checkArgument(i > 0, "Illegal subpartition number: " + i);
        Preconditions.checkArgument(i2 > 0, "Illegal merge factor: " + i2);
        this.numberOfSubpartitions = i;
        this.partitionDataRootPath = (String) Preconditions.checkNotNull(str);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.mergePolicy = new DefaultFileMergePolicy(i2, z, z2);
    }

    @Override // org.apache.flink.runtime.operators.sort.SortedDataFileMerger
    public MutableObjectIterator<Tuple2<Integer, T>> getMergingIterator(List<SortedDataFile<Tuple2<Integer, T>>> list, List<MemorySegment> list2, MutableObjectIterator<Tuple2<Integer, T>> mutableObjectIterator, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry) throws IOException {
        return new MutableObjectIterator<Tuple2<Integer, T>>() { // from class: org.apache.flink.runtime.io.network.partition.external.writer.ConcatPartitionedFileMerger.1
            public Tuple2<Integer, T> next(Tuple2<Integer, T> tuple2) throws IOException {
                return null;
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public Tuple2<Integer, T> m1957next() throws IOException {
                return null;
            }
        };
    }

    @Override // org.apache.flink.runtime.operators.sort.SortedDataFileMerger
    public void notifyNewSortedDataFile(SortedDataFile<Tuple2<Integer, T>> sortedDataFile, List<MemorySegment> list, List<MemorySegment> list2, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, AtomicBoolean atomicBoolean) throws IOException {
        if (!(sortedDataFile instanceof PartitionedBufferSortedDataFile)) {
            throw new IllegalArgumentException("Only PartitionedBufferSortedDataFile is supported: " + sortedDataFile.getClass().getName());
        }
        this.mergePolicy.addNewCandidate(new DataFileInfo<>(sortedDataFile.getBytesWritten(), 0, this.numberOfSubpartitions, sortedDataFile));
        mergeIfPossible(list2, channelDeleteRegistry, atomicBoolean);
    }

    @Override // org.apache.flink.runtime.operators.sort.SortedDataFileMerger
    public List<SortedDataFile<Tuple2<Integer, T>>> finishMerging(List<MemorySegment> list, List<MemorySegment> list2, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, AtomicBoolean atomicBoolean) throws IOException {
        this.mergePolicy.startFinalMerge();
        mergeIfPossible(list2, channelDeleteRegistry, atomicBoolean);
        return this.mergePolicy.getFinalMergeResult();
    }

    private void mergeIfPossible(List<MemorySegment> list, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, AtomicBoolean atomicBoolean) throws IOException {
        List<DataFileInfo<SortedDataFile<Tuple2<Integer, T>>>> selectMergeCandidates = this.mergePolicy.selectMergeCandidates(list.size());
        while (true) {
            List<DataFileInfo<SortedDataFile<Tuple2<Integer, T>>>> list2 = selectMergeCandidates;
            if (list2 == null || !atomicBoolean.get()) {
                return;
            }
            int i = 0;
            LinkedList linkedList = new LinkedList();
            for (DataFileInfo<SortedDataFile<Tuple2<Integer, T>>> dataFileInfo : list2) {
                i = Math.max(i, dataFileInfo.getMergeRound());
                linkedList.add((PartitionedSortedDataFile) dataFileInfo.getDataFile());
            }
            LOG.info("Start merging {} files to one file.", Integer.valueOf(linkedList.size()));
            try {
                int i2 = this.mergeFileIndex;
                this.mergeFileIndex = i2 - 1;
                ConcatPartitionedBufferSortedDataFile<T> mergeToOutput = mergeToOutput(linkedList, list, channelDeleteRegistry, i2);
                this.mergePolicy.addNewCandidate(new DataFileInfo<>(mergeToOutput.getBytesWritten(), i + 1, this.numberOfSubpartitions, mergeToOutput));
                selectMergeCandidates = this.mergePolicy.selectMergeCandidates(list.size());
            } catch (InterruptedException e) {
                throw new RuntimeException("Merge was interrupted.", e);
            }
        }
    }

    private ConcatPartitionedBufferSortedDataFile<T> mergeToOutput(List<PartitionedSortedDataFile<T>> list, List<MemorySegment> list2, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, int i) throws IOException, InterruptedException {
        FileIOChannel.ID createChannel = this.ioManager.createChannel(new File(ExternalBlockShuffleUtils.generateMergePath(this.partitionDataRootPath, i)));
        ConcatPartitionedBufferSortedDataFile<T> concatPartitionedBufferSortedDataFile = new ConcatPartitionedBufferSortedDataFile<>(this.numberOfSubpartitions, createChannel, i, this.ioManager);
        channelDeleteRegistry.registerChannelToBeDelete(createChannel);
        channelDeleteRegistry.registerOpenChannel(concatPartitionedBufferSortedDataFile.getWriteChannel());
        List partition = Lists.partition(list2, list2.size() / list.size());
        PartialOrderPriorityQueue partialOrderPriorityQueue = new PartialOrderPriorityQueue(new PartitionIndexStreamComparator(), list.size());
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < list.size(); i2++) {
            AsynchronousPartitionedStreamFileReaderDelegate asynchronousPartitionedStreamFileReaderDelegate = new AsynchronousPartitionedStreamFileReaderDelegate(this.ioManager, list.get(i2).getChannelID(), (List) partition.get(i2), list.get(i2).getPartitionIndexList());
            partialOrderPriorityQueue.add(new PartitionIndexStream(asynchronousPartitionedStreamFileReaderDelegate, list.get(i2).getPartitionIndexList()));
            hashSet.add(asynchronousPartitionedStreamFileReaderDelegate);
            channelDeleteRegistry.registerOpenChannel(asynchronousPartitionedStreamFileReaderDelegate.getReader());
        }
        while (partialOrderPriorityQueue.size() > 0) {
            PartitionIndexStream partitionIndexStream = (PartitionIndexStream) partialOrderPriorityQueue.peek();
            PartitionIndex currentPartitionIndex = partitionIndexStream.getCurrentPartitionIndex();
            if (partitionIndexStream.advance()) {
                partialOrderPriorityQueue.adjustTop();
            } else {
                partialOrderPriorityQueue.poll();
            }
            long j = 0;
            while (j < currentPartitionIndex.getLength()) {
                j += r0.getSize();
                concatPartitionedBufferSortedDataFile.writeBuffer(currentPartitionIndex.getPartition(), partitionIndexStream.getReader().getNextBufferBlocking());
            }
            if (!$assertionsDisabled && j != currentPartitionIndex.getLength()) {
                throw new AssertionError();
            }
        }
        concatPartitionedBufferSortedDataFile.finishWriting();
        channelDeleteRegistry.unregisterOpenChannel(concatPartitionedBufferSortedDataFile.getWriteChannel());
        clearMerged(channelDeleteRegistry, hashSet);
        return concatPartitionedBufferSortedDataFile;
    }

    private void clearMerged(ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, Set<AsynchronousPartitionedStreamFileReaderDelegate> set) throws IOException {
        for (AsynchronousPartitionedStreamFileReaderDelegate asynchronousPartitionedStreamFileReaderDelegate : set) {
            asynchronousPartitionedStreamFileReaderDelegate.close();
            channelDeleteRegistry.unregisterOpenChannel(asynchronousPartitionedStreamFileReaderDelegate.getReader());
            asynchronousPartitionedStreamFileReaderDelegate.getReader().deleteChannel();
            channelDeleteRegistry.unregisterChannelToBeDelete(asynchronousPartitionedStreamFileReaderDelegate.getReader().getChannelID());
        }
        set.clear();
    }

    static {
        $assertionsDisabled = !ConcatPartitionedFileMerger.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ConcatPartitionedFileMerger.class);
    }
}
