/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelBackendMutableObjectIterator;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.ChannelDeleteRegistry;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.operators.sort.SortedDataFileFactory;
import org.apache.flink.runtime.operators.sort.SortedDataFileMerger;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordComparisonMerger<T>
implements SortedDataFileMerger<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RecordComparisonMerger.class);
    private List<SortedDataFile<T>> sortedDataFiles;
    protected final SortedDataFileFactory<T> sortedDataFileFactory;
    protected final IOManager ioManager;
    protected final TypeSerializer<T> typeSerializer;
    protected final TypeComparator<T> typeComparator;
    private final int maxFileHandlesPerMerge;
    protected final boolean objectReuseEnabled;

    public RecordComparisonMerger(SortedDataFileFactory<T> sortedDataFileFactory, IOManager ioManager, TypeSerializer<T> typeSerializer, TypeComparator<T> typeComparator, int maxFileHandlesPerMerge, boolean objectReuseEnabled) {
        this.sortedDataFileFactory = sortedDataFileFactory;
        this.ioManager = ioManager;
        this.typeSerializer = typeSerializer;
        this.typeComparator = typeComparator;
        this.maxFileHandlesPerMerge = maxFileHandlesPerMerge;
        this.objectReuseEnabled = objectReuseEnabled;
        this.sortedDataFiles = new ArrayList<SortedDataFile<T>>();
    }

    private void merge(List<MemorySegment> writeMemory, List<MemorySegment> mergeReadMemory, ChannelDeleteRegistry channelDeleteRegistry, AtomicBoolean aliveFlag) throws IOException {
        int maxFanIn = Math.min(this.maxFileHandlesPerMerge, mergeReadMemory.size());
        while (aliveFlag.get() && this.sortedDataFiles.size() > maxFanIn) {
            this.sortedDataFiles = this.mergeChannelList(this.sortedDataFiles, mergeReadMemory, writeMemory, channelDeleteRegistry, maxFanIn);
        }
    }

    @Override
    public MutableObjectIterator<T> getMergingIterator(List<SortedDataFile<T>> channels, List<MemorySegment> mergeReadMemory, MutableObjectIterator<T> largeRecords, ChannelDeleteRegistry channelDeleteRegistry) throws IOException {
        List<List<MemorySegment>> segmentedReadMemory = this.distributeReadMemory(mergeReadMemory, channels.size());
        return this.getMergingIteratorWithSegmentedMemory(channels, segmentedReadMemory, null, largeRecords, channelDeleteRegistry);
    }

    @Override
    public void notifyNewSortedDataFile(SortedDataFile<T> sortedDataFile, List<MemorySegment> writeMemory, List<MemorySegment> mergeReadMemory, ChannelDeleteRegistry channelDeleteRegistry, AtomicBoolean aliveFlag) throws IOException {
        this.sortedDataFiles.add(sortedDataFile);
    }

    @Override
    public List<SortedDataFile<T>> finishMerging(List<MemorySegment> writeMemory, List<MemorySegment> mergeReadMemory, ChannelDeleteRegistry channelDeleteRegistry, AtomicBoolean aliveFlag) throws IOException {
        this.merge(writeMemory, mergeReadMemory, channelDeleteRegistry, aliveFlag);
        return this.sortedDataFiles;
    }

    protected final MergeIterator<T> getMergingIteratorWithSegmentedMemory(List<SortedDataFile<T>> files, List<List<MemorySegment>> inputSegments, List<FileIOChannel> channelAccessed, MutableObjectIterator<T> largeRecords, ChannelDeleteRegistry channelDeleteRegistry) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Performing merge of " + files.size() + " sorted streams.");
        }
        ArrayList<MutableObjectIterator<MutableObjectIterator<T>>> iterators = new ArrayList<MutableObjectIterator<MutableObjectIterator<T>>>(files.size() + 1);
        for (int i = 0; i < files.size(); ++i) {
            List<MemorySegment> segsForChannel = inputSegments.get(i);
            ChannelBackendMutableObjectIterator<T> channelBackendMutableObjectIterator = files.get(i).createReader(segsForChannel);
            if (channelAccessed != null) {
                channelAccessed.add(channelBackendMutableObjectIterator.getReaderChannel());
            }
            channelDeleteRegistry.registerOpenChannel(channelBackendMutableObjectIterator.getReaderChannel());
            channelDeleteRegistry.registerChannelToBeDelete(channelBackendMutableObjectIterator.getReaderChannel().getChannelID());
            iterators.add(channelBackendMutableObjectIterator);
        }
        if (largeRecords != null) {
            iterators.add(largeRecords);
        }
        return new MergeIterator(iterators, this.typeComparator);
    }

    protected final List<SortedDataFile<T>> mergeChannelList(List<SortedDataFile<T>> files, List<MemorySegment> allReadBuffers, List<MemorySegment> writeBuffers, ChannelDeleteRegistry channelDeleteRegistry, int maxFanIn) throws IOException {
        double scale = Math.ceil(Math.log(files.size()) / Math.log(maxFanIn)) - 1.0;
        int numStart = files.size();
        int numEnd = (int)Math.pow(maxFanIn, scale);
        int numMerges = (int)Math.ceil((double)(numStart - numEnd) / (double)(maxFanIn - 1));
        int numNotMerged = numEnd - numMerges;
        int numToMerge = numStart - numNotMerged;
        ArrayList<SortedDataFile<T>> mergedFiles = new ArrayList<SortedDataFile<T>>(numEnd);
        mergedFiles.addAll(files.subList(0, numNotMerged));
        int channelsToMergePerStep = (int)Math.ceil((double)numToMerge / (double)numMerges);
        List<List<MemorySegment>> segmentedFileChannels = this.distributeReadMemory(allReadBuffers, channelsToMergePerStep);
        ArrayList<SortedDataFile<T>> channelsToMergeThisStep = new ArrayList<SortedDataFile<T>>(channelsToMergePerStep);
        int channelNum = numNotMerged;
        while (channelNum < files.size()) {
            channelsToMergeThisStep.clear();
            for (int i = 0; i < channelsToMergePerStep && channelNum < files.size(); ++i, ++channelNum) {
                channelsToMergeThisStep.add(files.get(channelNum));
            }
            mergedFiles.add(this.mergeToNewFile(channelsToMergeThisStep, segmentedFileChannels, writeBuffers, channelDeleteRegistry));
        }
        return mergedFiles;
    }

    protected SortedDataFile<T> mergeToNewFile(List<SortedDataFile<T>> files, List<List<MemorySegment>> readBuffers, List<MemorySegment> writeBuffers, ChannelDeleteRegistry channelDeleteRegistry) throws IOException {
        Object rec;
        ArrayList<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(files.size());
        MergeIterator<T> mergeIterator = this.getMergingIteratorWithSegmentedMemory(files, readBuffers, channelAccesses, null, channelDeleteRegistry);
        SortedDataFile<Object> writer = this.sortedDataFileFactory.createFile(writeBuffers);
        channelDeleteRegistry.registerChannelToBeDelete(writer.getChannelID());
        channelDeleteRegistry.registerOpenChannel(writer.getWriteChannel());
        if (this.objectReuseEnabled) {
            rec = this.typeSerializer.createInstance();
            while ((rec = mergeIterator.next(rec)) != null) {
                writer.writeRecord(rec);
            }
        } else {
            while ((rec = mergeIterator.next()) != null) {
                writer.writeRecord(rec);
            }
        }
        writer.finishWriting();
        channelDeleteRegistry.unregisterOpenChannel(writer.getWriteChannel());
        for (int i = 0; i < channelAccesses.size(); ++i) {
            FileIOChannel access = (FileIOChannel)channelAccesses.get(i);
            access.closeAndDelete();
            channelDeleteRegistry.unregisterOpenChannel(access);
        }
        return writer;
    }

    protected final List<List<MemorySegment>> distributeReadMemory(List<MemorySegment> memory, int numChannels) {
        ArrayList<List<MemorySegment>> target = new ArrayList<List<MemorySegment>>(numChannels);
        int numBuffers = memory.size();
        int buffersPerChannelLowerBound = numBuffers / numChannels;
        int numChannelsWithOneMore = numBuffers % numChannels;
        Iterator<MemorySegment> segments = memory.iterator();
        for (int i = 0; i < numChannels; ++i) {
            int toAssign = i < numChannelsWithOneMore ? buffersPerChannelLowerBound + 1 : buffersPerChannelLowerBound;
            ArrayList<MemorySegment> segs = new ArrayList<MemorySegment>(toAssign);
            target.add(segs);
            for (int j = 0; j < toAssign; ++j) {
                segs.add(segments.next());
            }
        }
        return target;
    }
}

