/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.io.network.partition.external.PersistentFileType;
import org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionHashFileWriter<T>
implements PersistentFileWriter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionHashFileWriter.class);
    private final int numPartitions;
    private final RecordSerializer<IOReadableWritable> recordSerializer;
    private final SerializationDelegate<T> serializationDelegate;
    private final FixedLengthBufferPool bufferPool;
    private final BufferFileWriter[] fileWriters;
    private final BufferBuilder[] currentBufferBuilders;
    private final long[] bytesWritten;
    private final Counter numBytesOut;
    private final Counter numBuffersOut;

    public PartitionHashFileWriter(int numPartitions, String partitionDataRootPath, List<MemorySegment> memory, IOManager ioManager, TypeSerializer<T> serializer) throws IOException {
        this(numPartitions, partitionDataRootPath, memory, ioManager, serializer, null, null);
    }

    public PartitionHashFileWriter(int numPartitions, String partitionDataRootPath, List<MemorySegment> memory, IOManager ioManager, TypeSerializer<T> serializer, Counter numBytesOut, Counter numBuffersOut) throws IOException {
        Preconditions.checkArgument((numPartitions > 0 ? 1 : 0) != 0, (Object)("The number of subpartitions should be larger than 0, but actually is: " + numPartitions));
        this.numPartitions = numPartitions;
        Preconditions.checkArgument((memory.size() >= numPartitions ? 1 : 0) != 0, (Object)("The number of memory segments should be more than that of subpartitions, but actually numMemory: " + memory.size() + ", numPartitions: " + numPartitions));
        this.recordSerializer = new SpanningRecordSerializer<IOReadableWritable>();
        this.serializationDelegate = new SerializationDelegate<T>(serializer);
        this.bufferPool = new FixedLengthBufferPool(memory, false);
        this.fileWriters = new BufferFileWriter[numPartitions];
        this.currentBufferBuilders = new BufferBuilder[numPartitions];
        this.bytesWritten = new long[numPartitions];
        for (int i = 0; i < numPartitions; ++i) {
            String path = ExternalBlockShuffleUtils.generateDataPath(partitionDataRootPath, i);
            this.fileWriters[i] = ioManager.createStreamFileWriter(ioManager.createChannel(new File(path)));
            this.bytesWritten[i] = 0L;
        }
        this.numBytesOut = numBytesOut;
        this.numBuffersOut = numBuffersOut;
    }

    @Override
    public void add(T record, int targetPartition) throws IOException, InterruptedException {
        this.serializationDelegate.setInstance(record);
        this.recordSerializer.serializeRecord(this.serializationDelegate);
        this.copyToTargetFile(targetPartition);
    }

    @Override
    public void add(T record, int[] targetPartitions) throws IOException, InterruptedException {
        this.serializationDelegate.setInstance(record);
        this.recordSerializer.serializeRecord(this.serializationDelegate);
        for (int partition : targetPartitions) {
            this.copyToTargetFile(partition);
        }
    }

    @Override
    public void finish() throws IOException, InterruptedException {
        for (int i = 0; i < this.numPartitions; ++i) {
            this.tryFinishCurrentBufferBuilder(i);
            this.fileWriters[i].close();
        }
    }

    @Override
    public List<List<PartitionIndex>> generatePartitionIndices() throws IOException, InterruptedException {
        ArrayList<PartitionIndex> partitionIndex = new ArrayList<PartitionIndex>();
        for (int i = 0; i < this.numPartitions; ++i) {
            partitionIndex.add(new PartitionIndex(i, 0L, this.bytesWritten[i]));
        }
        return Collections.singletonList(partitionIndex);
    }

    @Override
    public void clear() throws IOException {
        this.bufferPool.lazyDestroy();
    }

    @Override
    public PersistentFileType getExternalFileType() {
        return PersistentFileType.HASH_PARTITION_FILE;
    }

    private void copyToTargetFile(int partition) throws IOException, InterruptedException {
        this.recordSerializer.reset();
        BufferBuilder bufferBuilder = this.getCurrentBufferBuilder(partition);
        RecordSerializer.SerializationResult result = this.recordSerializer.copyToBufferBuilder(bufferBuilder);
        while (result.isFullBuffer()) {
            this.tryFinishCurrentBufferBuilder(partition);
            if (result.isFullRecord()) break;
            bufferBuilder = this.getCurrentBufferBuilder(partition);
            result = this.recordSerializer.copyToBufferBuilder(bufferBuilder);
        }
        Preconditions.checkState((!this.recordSerializer.hasSerializedData() ? 1 : 0) != 0, (Object)"All data should be written at once");
    }

    private BufferBuilder getCurrentBufferBuilder(int partition) throws InterruptedException, IOException {
        if (this.currentBufferBuilders[partition] == null) {
            this.currentBufferBuilders[partition] = this.bufferPool.requestBufferBuilderBlocking();
            Preconditions.checkState((this.currentBufferBuilders[partition] != null ? 1 : 0) != 0, (Object)"Failed to request a buffer.");
        }
        return this.currentBufferBuilders[partition];
    }

    private void tryFinishCurrentBufferBuilder(int partition) throws IOException {
        if (this.currentBufferBuilders[partition] != null) {
            this.currentBufferBuilders[partition].finish();
            BufferConsumer consumer = this.currentBufferBuilders[partition].createBufferConsumer();
            Buffer buffer = consumer.build();
            int n = partition;
            this.bytesWritten[n] = this.bytesWritten[n] + (long)buffer.getSize();
            this.fileWriters[partition].writeBlock(buffer);
            if (this.numBytesOut != null) {
                this.numBytesOut.inc((long)buffer.getSize());
            }
            if (this.numBuffersOut != null) {
                this.numBuffersOut.inc();
            }
            consumer.close();
            this.currentBufferBuilders[partition] = null;
        }
    }
}

