/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.io.network.partition.external.writer.AsynchronousPartitionedStreamFileReaderDelegate;
import org.apache.flink.runtime.io.network.partition.external.writer.ConcatPartitionedBufferSortedDataFile;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionedBufferSortedDataFile;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionedSortedDataFile;
import org.apache.flink.runtime.operators.sort.ChannelDeleteRegistry;
import org.apache.flink.runtime.operators.sort.DataFileInfo;
import org.apache.flink.runtime.operators.sort.DefaultFileMergePolicy;
import org.apache.flink.runtime.operators.sort.MergePolicy;
import org.apache.flink.runtime.operators.sort.PartialOrderPriorityQueue;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.operators.sort.SortedDataFileMerger;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcatPartitionedFileMerger<T>
implements SortedDataFileMerger<Tuple2<Integer, T>> {
    private static final Logger LOG = LoggerFactory.getLogger(ConcatPartitionedFileMerger.class);
    private final int numberOfSubpartitions;
    private final String partitionDataRootPath;
    private final IOManager ioManager;
    private final MergePolicy<SortedDataFile<Tuple2<Integer, T>>> mergePolicy;
    private int mergeFileIndex = Integer.MAX_VALUE;

    ConcatPartitionedFileMerger(int numberOfSubpartitions, String partitionDataRootPath, int mergeFactor, boolean enableAsyncMerging, boolean mergeToOneFile, IOManager ioManager) {
        Preconditions.checkArgument((numberOfSubpartitions > 0 ? 1 : 0) != 0, (Object)("Illegal subpartition number: " + numberOfSubpartitions));
        Preconditions.checkArgument((mergeFactor > 0 ? 1 : 0) != 0, (Object)("Illegal merge factor: " + mergeFactor));
        this.numberOfSubpartitions = numberOfSubpartitions;
        this.partitionDataRootPath = (String)Preconditions.checkNotNull((Object)partitionDataRootPath);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.mergePolicy = new DefaultFileMergePolicy<SortedDataFile<Tuple2<Integer, T>>>(mergeFactor, enableAsyncMerging, mergeToOneFile);
    }

    @Override
    public MutableObjectIterator<Tuple2<Integer, T>> getMergingIterator(List<SortedDataFile<Tuple2<Integer, T>>> sortedDataFiles, List<MemorySegment> mergeReadMemory, MutableObjectIterator<Tuple2<Integer, T>> largeRecords, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry) throws IOException {
        return new MutableObjectIterator<Tuple2<Integer, T>>(){

            public Tuple2<Integer, T> next(Tuple2<Integer, T> reuse) throws IOException {
                return null;
            }

            public Tuple2<Integer, T> next() throws IOException {
                return null;
            }
        };
    }

    @Override
    public void notifyNewSortedDataFile(SortedDataFile<Tuple2<Integer, T>> sortedDataFile, List<MemorySegment> writeMemory, List<MemorySegment> mergeReadMemory, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, AtomicBoolean aliveFlag) throws IOException {
        if (!(sortedDataFile instanceof PartitionedBufferSortedDataFile)) {
            throw new IllegalArgumentException("Only PartitionedBufferSortedDataFile is supported: " + sortedDataFile.getClass().getName());
        }
        DataFileInfo<SortedDataFile<Tuple2<Integer, T>>> dataFileInfo = new DataFileInfo<SortedDataFile<Tuple2<Integer, T>>>(sortedDataFile.getBytesWritten(), 0, this.numberOfSubpartitions, sortedDataFile);
        this.mergePolicy.addNewCandidate(dataFileInfo);
        this.mergeIfPossible(mergeReadMemory, channelDeleteRegistry, aliveFlag);
    }

    @Override
    public List<SortedDataFile<Tuple2<Integer, T>>> finishMerging(List<MemorySegment> writeMemory, List<MemorySegment> mergeReadMemory, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, AtomicBoolean aliveFlag) throws IOException {
        this.mergePolicy.startFinalMerge();
        this.mergeIfPossible(mergeReadMemory, channelDeleteRegistry, aliveFlag);
        return this.mergePolicy.getFinalMergeResult();
    }

    private void mergeIfPossible(List<MemorySegment> mergeReadMemory, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, AtomicBoolean aliveFlag) throws IOException {
        List<DataFileInfo<SortedDataFile<Tuple2<Integer, T>>>> mergeCandidates = this.mergePolicy.selectMergeCandidates(mergeReadMemory.size());
        while (mergeCandidates != null && aliveFlag.get()) {
            int maxMergeRound = 0;
            LinkedList<PartitionedSortedDataFile<T>> toBeMerged = new LinkedList<PartitionedSortedDataFile<T>>();
            for (DataFileInfo<SortedDataFile<Tuple2<Integer, T>>> mergeCandidate : mergeCandidates) {
                maxMergeRound = Math.max(maxMergeRound, mergeCandidate.getMergeRound());
                PartitionedSortedDataFile partitionedSortedDataFile = (PartitionedSortedDataFile)mergeCandidate.getDataFile();
                toBeMerged.add(partitionedSortedDataFile);
            }
            LOG.info("Start merging {} files to one file.", (Object)toBeMerged.size());
            try {
                ConcatPartitionedBufferSortedDataFile<T> mergedFile = this.mergeToOutput(toBeMerged, mergeReadMemory, channelDeleteRegistry, this.mergeFileIndex--);
                DataFileInfo<ConcatPartitionedBufferSortedDataFile<T>> mergedFileInfo = new DataFileInfo<ConcatPartitionedBufferSortedDataFile<T>>(mergedFile.getBytesWritten(), maxMergeRound + 1, this.numberOfSubpartitions, mergedFile);
                this.mergePolicy.addNewCandidate(mergedFileInfo);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Merge was interrupted.", e);
            }
            mergeCandidates = this.mergePolicy.selectMergeCandidates(mergeReadMemory.size());
        }
    }

    private ConcatPartitionedBufferSortedDataFile<T> mergeToOutput(List<PartitionedSortedDataFile<T>> toBeMerged, List<MemorySegment> mergeReadMemory, ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, int fileId) throws IOException, InterruptedException {
        FileIOChannel.ID channel = this.ioManager.createChannel(new File(ExternalBlockShuffleUtils.generateMergePath(this.partitionDataRootPath, fileId)));
        ConcatPartitionedBufferSortedDataFile writer = new ConcatPartitionedBufferSortedDataFile(this.numberOfSubpartitions, channel, fileId, this.ioManager);
        channelDeleteRegistry.registerChannelToBeDelete(channel);
        channelDeleteRegistry.registerOpenChannel(writer.getWriteChannel());
        List segments = Lists.partition(mergeReadMemory, (int)(mergeReadMemory.size() / toBeMerged.size()));
        PartialOrderPriorityQueue<PartitionIndexStream> heap = new PartialOrderPriorityQueue<PartitionIndexStream>(new PartitionIndexStreamComparator(), toBeMerged.size());
        HashSet<AsynchronousPartitionedStreamFileReaderDelegate> allReaders = new HashSet<AsynchronousPartitionedStreamFileReaderDelegate>();
        for (int i = 0; i < toBeMerged.size(); ++i) {
            AsynchronousPartitionedStreamFileReaderDelegate readerDelegate = new AsynchronousPartitionedStreamFileReaderDelegate(this.ioManager, toBeMerged.get(i).getChannelID(), (List)segments.get(i), toBeMerged.get(i).getPartitionIndexList());
            heap.add(new PartitionIndexStream(readerDelegate, toBeMerged.get(i).getPartitionIndexList()));
            allReaders.add(readerDelegate);
            channelDeleteRegistry.registerOpenChannel(readerDelegate.getReader());
        }
        while (heap.size() > 0) {
            long readLength;
            Buffer buffer;
            PartitionIndexStream headStream = heap.peek();
            PartitionIndex partitionIndex = headStream.getCurrentPartitionIndex();
            if (!headStream.advance()) {
                heap.poll();
            } else {
                heap.adjustTop();
            }
            for (readLength = 0L; readLength < partitionIndex.getLength(); readLength += (long)buffer.getSize()) {
                buffer = headStream.getReader().getNextBufferBlocking();
                writer.writeBuffer(partitionIndex.getPartition(), buffer);
            }
            assert (readLength == partitionIndex.getLength());
        }
        writer.finishWriting();
        channelDeleteRegistry.unregisterOpenChannel(writer.getWriteChannel());
        this.clearMerged(channelDeleteRegistry, allReaders);
        return writer;
    }

    private void clearMerged(ChannelDeleteRegistry<Tuple2<Integer, T>> channelDeleteRegistry, Set<AsynchronousPartitionedStreamFileReaderDelegate> allReaders) throws IOException {
        for (AsynchronousPartitionedStreamFileReaderDelegate reader : allReaders) {
            reader.close();
            channelDeleteRegistry.unregisterOpenChannel(reader.getReader());
            reader.getReader().deleteChannel();
            channelDeleteRegistry.unregisterChannelToBeDelete(reader.getReader().getChannelID());
        }
        allReaders.clear();
    }

    private static final class PartitionIndexStreamComparator
    implements Comparator<PartitionIndexStream> {
        private PartitionIndexStreamComparator() {
        }

        @Override
        public int compare(PartitionIndexStream first, PartitionIndexStream second) {
            int secondPartition;
            int firstPartition = first.getCurrentPartitionIndex().getPartition();
            if (firstPartition != (secondPartition = second.getCurrentPartitionIndex().getPartition())) {
                return firstPartition < secondPartition ? -1 : 1;
            }
            long firstStart = first.getCurrentPartitionIndex().getStartOffset();
            long secondStart = second.getCurrentPartitionIndex().getStartOffset();
            return Long.compare(firstStart, secondStart);
        }
    }

    private static final class PartitionIndexStream {
        private final AsynchronousPartitionedStreamFileReaderDelegate reader;
        private final List<PartitionIndex> partitionIndices;
        private int offset;

        public PartitionIndexStream(AsynchronousPartitionedStreamFileReaderDelegate reader, List<PartitionIndex> partitionIndices) {
            this.reader = reader;
            this.partitionIndices = partitionIndices;
            this.offset = 0;
        }

        public PartitionIndex getCurrentPartitionIndex() {
            return this.partitionIndices.get(this.offset);
        }

        public AsynchronousPartitionedStreamFileReaderDelegate getReader() {
            return this.reader;
        }

        public boolean advance() {
            if (this.offset < this.partitionIndices.size() - 1) {
                ++this.offset;
                return true;
            }
            return false;
        }

        public String toString() {
            return "PartitionIndexStream{partitionIndices=" + this.partitionIndices.size() + ", offset=" + this.offset + '}';
        }
    }
}

