/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DuplicateOnlySerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManager;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.io.network.partition.external.PersistentFileType;
import org.apache.flink.runtime.io.network.partition.external.writer.BufferSortedDataFileFactory;
import org.apache.flink.runtime.io.network.partition.external.writer.ConcatPartitionedFileMerger;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionedBufferSortedDataFileFactory;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionedSortedDataFile;
import org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.PushedUnilateralSortMerger;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionMergeFileWriter<T>
implements PersistentFileWriter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionMergeFileWriter.class);
    private final String partitionDataRootPath;
    private final TypeSerializer<T> typeSerializer;
    private final List<MemorySegment> allMemory;
    private final Tuple2<Integer, T> reuse = new Tuple2();
    private final PushedUnilateralSortMerger<Tuple2<Integer, T>> sortMerger;

    public PartitionMergeFileWriter(int numPartitions, String partitionDataRootPath, int mergeFactor, boolean enableAsyncMerging, boolean mergeToOneFile, MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, TypeSerializer<T> serializer, SerializerManager<SerializationDelegate<T>> serializerManager, AbstractInvokable parentTask) throws IOException, MemoryAllocationException {
        this(numPartitions, partitionDataRootPath, mergeFactor, enableAsyncMerging, mergeToOneFile, memoryManager, memory, ioManager, serializer, serializerManager, parentTask, null, null);
    }

    public PartitionMergeFileWriter(int numPartitions, String partitionDataRootPath, int mergeFactor, boolean enableAsyncMerging, boolean mergeToOneFile, MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, TypeSerializer<T> serializer, SerializerManager<SerializationDelegate<T>> serializerManager, AbstractInvokable parentTask, Counter numBytesOut, Counter numBuffersOut) throws IOException, MemoryAllocationException {
        Preconditions.checkArgument((numPartitions > 0 ? 1 : 0) != 0, (Object)("The number of subpartitions should be larger than 0, but actually is: " + numPartitions));
        Preconditions.checkArgument((mergeFactor >= 2 ? 1 : 0) != 0, (Object)("Illegal merge factor: " + mergeFactor));
        this.partitionDataRootPath = partitionDataRootPath;
        this.typeSerializer = serializer;
        this.allMemory = memory;
        Class<Tuple2> typedTuple = Tuple2.class;
        TypeSerializer[] serializers = new TypeSerializer[]{IntSerializer.INSTANCE, serializer.duplicate()};
        TupleSerializer tuple2Serializer = new TupleSerializer(typedTuple, serializers);
        DuplicateOnlySerializerFactory serializerFactory = new DuplicateOnlySerializerFactory((TypeSerializer)tuple2Serializer);
        int[] keyPositions = new int[]{0};
        TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
        TupleComparator tuple2Comparator = new TupleComparator(keyPositions, comparators, serializers);
        BufferSortedDataFileFactory<T> sortedDataFileFactory = new BufferSortedDataFileFactory<T>(partitionDataRootPath, this.typeSerializer, ioManager, serializerManager, numBytesOut, numBuffersOut);
        PartitionedBufferSortedDataFileFactory<T> partitionedBufferSortedDataFileFactory = new PartitionedBufferSortedDataFileFactory<T>(sortedDataFileFactory, numPartitions);
        ConcatPartitionedFileMerger merger = new ConcatPartitionedFileMerger(numPartitions, partitionDataRootPath, mergeFactor, enableAsyncMerging, mergeToOneFile, ioManager);
        this.sortMerger = new PushedUnilateralSortMerger<T>(partitionedBufferSortedDataFileFactory, merger, memoryManager, this.allMemory, ioManager, parentTask, serializerFactory, tuple2Comparator, 0, mergeFactor, false, 0.0f, false, true, true, enableAsyncMerging);
        LOG.info("External result partition writer initialized.");
    }

    @Override
    public void add(T record, int targetPartition) throws IOException {
        this.reuse.f1 = record;
        this.reuse.f0 = targetPartition;
        this.sortMerger.add(this.reuse);
    }

    @Override
    public void add(T record, int[] targetPartitions) throws IOException {
        this.reuse.f1 = record;
        for (int partition : targetPartitions) {
            this.reuse.f0 = partition;
            this.sortMerger.add(this.reuse);
        }
    }

    @Override
    public void finish() throws IOException, InterruptedException {
        this.sortMerger.finishAdding();
        List remainFiles = this.sortMerger.getRemainingSortedDataFiles();
        int nextFileId = 0;
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        for (SortedDataFile file : remainFiles) {
            localFileSystem.rename(new Path(file.getChannelID().getPath()), new Path(ExternalBlockShuffleUtils.generateDataPath(this.partitionDataRootPath, nextFileId++)));
        }
        LOG.info("Finish external result partition writing.");
    }

    @Override
    public List<List<PartitionIndex>> generatePartitionIndices() throws IOException, InterruptedException {
        ArrayList<List<PartitionIndex>> partitionIndices = new ArrayList<List<PartitionIndex>>();
        List remainFiles = this.sortMerger.getRemainingSortedDataFiles();
        for (SortedDataFile file : remainFiles) {
            if (!(file instanceof PartitionedSortedDataFile)) {
                throw new IllegalStateException("Unexpected file type.");
            }
            partitionIndices.add(((PartitionedSortedDataFile)file).getPartitionIndexList());
        }
        return partitionIndices;
    }

    @Override
    public void clear() throws IOException {
    }

    @Override
    public PersistentFileType getExternalFileType() {
        return PersistentFileType.MERGED_PARTITION_FILE;
    }
}

