package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DuplicateOnlySerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManager;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.io.network.partition.external.PersistentFileType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.PushedUnilateralSortMerger;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/writer/PartitionMergeFileWriter.class */
public class PartitionMergeFileWriter<T> implements PersistentFileWriter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionMergeFileWriter.class);
    private final String partitionDataRootPath;
    private final TypeSerializer<T> typeSerializer;
    private final List<MemorySegment> allMemory;
    private final Tuple2<Integer, T> reuse;
    private final PushedUnilateralSortMerger<Tuple2<Integer, T>> sortMerger;

    public PartitionMergeFileWriter(int i, String str, int i2, boolean z, boolean z2, MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, TypeSerializer<T> typeSerializer, SerializerManager<SerializationDelegate<T>> serializerManager, AbstractInvokable abstractInvokable) throws IOException, MemoryAllocationException {
        this(i, str, i2, z, z2, memoryManager, list, iOManager, typeSerializer, serializerManager, abstractInvokable, null, null);
    }

    public PartitionMergeFileWriter(int i, String str, int i2, boolean z, boolean z2, MemoryManager memoryManager, List<MemorySegment> list, IOManager iOManager, TypeSerializer<T> typeSerializer, SerializerManager<SerializationDelegate<T>> serializerManager, AbstractInvokable abstractInvokable, Counter counter, Counter counter2) throws IOException, MemoryAllocationException {
        this.reuse = new Tuple2<>();
        Preconditions.checkArgument(i > 0, "The number of subpartitions should be larger than 0, but actually is: " + i);
        Preconditions.checkArgument(i2 >= 2, "Illegal merge factor: " + i2);
        this.partitionDataRootPath = str;
        this.typeSerializer = typeSerializer;
        this.allMemory = list;
        TypeSerializer[] typeSerializerArr = {IntSerializer.INSTANCE, typeSerializer.duplicate()};
        this.sortMerger = new PushedUnilateralSortMerger<>(new PartitionedBufferSortedDataFileFactory(new BufferSortedDataFileFactory(str, this.typeSerializer, iOManager, serializerManager, counter, counter2), i), new ConcatPartitionedFileMerger(i, str, i2, z, z2, iOManager), memoryManager, this.allMemory, iOManager, abstractInvokable, new DuplicateOnlySerializerFactory(new TupleSerializer(Tuple2.class, typeSerializerArr)), new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, typeSerializerArr), 0, i2, false, 0.0f, false, true, true, z);
        LOG.info("External result partition writer initialized.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void add(T t, int i) throws IOException {
        this.reuse.f1 = t;
        this.reuse.f0 = Integer.valueOf(i);
        this.sortMerger.add(this.reuse);
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void add(T t, int[] iArr) throws IOException {
        this.reuse.f1 = t;
        for (int i : iArr) {
            this.reuse.f0 = Integer.valueOf(i);
            this.sortMerger.add(this.reuse);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void finish() throws IOException, InterruptedException {
        this.sortMerger.finishAdding();
        List<SortedDataFile<Tuple2<Integer, T>>> remainingSortedDataFiles = this.sortMerger.getRemainingSortedDataFiles();
        int i = 0;
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        Iterator<SortedDataFile<Tuple2<Integer, T>>> it = remainingSortedDataFiles.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            localFileSystem.rename(new Path(it.next().getChannelID().getPath()), new Path(ExternalBlockShuffleUtils.generateDataPath(this.partitionDataRootPath, i2)));
        }
        LOG.info("Finish external result partition writing.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public List<List<PartitionIndex>> generatePartitionIndices() throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (SortedDataFile<Tuple2<Integer, T>> sortedDataFile : this.sortMerger.getRemainingSortedDataFiles()) {
            if (!(sortedDataFile instanceof PartitionedSortedDataFile)) {
                throw new IllegalStateException("Unexpected file type.");
            }
            arrayList.add(((PartitionedSortedDataFile) sortedDataFile).getPartitionIndexList());
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public void clear() throws IOException {
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter
    public PersistentFileType getExternalFileType() {
        return PersistentFileType.MERGED_PARTITION_FILE;
    }
}
