/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external.writer;

import java.io.IOException;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelBackendMutableObjectIterator;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.io.network.partition.external.writer.BufferSortedDataFile;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionIndexGenerator;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionedSortedDataFile;

public class PartitionedBufferSortedDataFile<T>
implements PartitionedSortedDataFile<T> {
    private final BufferSortedDataFile<T> backendFile;
    private final PartitionIndexGenerator partitionIndexGenerator;
    private int currentPartition;
    private int numRecordWritten;

    public PartitionedBufferSortedDataFile(int numberOfSubpartitions, BufferSortedDataFile<T> backendFile) {
        this.backendFile = backendFile;
        this.partitionIndexGenerator = new PartitionIndexGenerator(numberOfSubpartitions);
    }

    @Override
    public FileIOChannel getWriteChannel() {
        return this.backendFile.getWriteChannel();
    }

    @Override
    public FileIOChannel.ID getChannelID() {
        return this.backendFile.getChannelID();
    }

    @Override
    public void writeRecord(Tuple2<Integer, T> record) throws IOException {
        if ((Integer)record.f0 != this.currentPartition) {
            this.backendFile.flush();
            this.currentPartition = (Integer)record.f0;
        }
        this.partitionIndexGenerator.updatePartitionIndexBeforeWriting((Integer)record.f0, this.backendFile.getBytesWritten(), this.numRecordWritten);
        this.backendFile.writeRecord(record.f1);
        ++this.numRecordWritten;
    }

    @Override
    public void copyRecord(DataInputView serializedRecord) throws IOException {
        int partitionIndex = serializedRecord.readInt();
        if (partitionIndex != this.currentPartition) {
            this.backendFile.flush();
            this.currentPartition = partitionIndex;
        }
        this.partitionIndexGenerator.updatePartitionIndexBeforeWriting(partitionIndex, this.backendFile.getBytesWritten(), this.numRecordWritten);
        this.backendFile.copyRecord(serializedRecord);
        ++this.numRecordWritten;
    }

    @Override
    public long getBytesWritten() throws IOException {
        return this.backendFile.getBytesWritten();
    }

    @Override
    public void finishWriting() throws IOException {
        this.backendFile.finishWriting();
        this.partitionIndexGenerator.finishWriting(this.backendFile.getBytesWritten(), this.numRecordWritten);
    }

    @Override
    public ChannelBackendMutableObjectIterator<Tuple2<Integer, T>> createReader(List<MemorySegment> readMemory) throws IOException {
        ChannelBackendMutableObjectIterator<T> recordIterator = this.backendFile.createReader(readMemory);
        return new PartitionedRecordsIterator<T>(recordIterator, this.partitionIndexGenerator.getPartitionIndices());
    }

    @Override
    public List<PartitionIndex> getPartitionIndexList() {
        return this.partitionIndexGenerator.getPartitionIndices();
    }

    public int getFileId() {
        return this.backendFile.getFileId();
    }

    private static class PartitionedRecordsIterator<T>
    implements ChannelBackendMutableObjectIterator<Tuple2<Integer, T>> {
        private final ChannelBackendMutableObjectIterator<T> recordIterator;
        private final List<PartitionIndex> partitionIndices;
        private long numReadRecords;
        private int currentPartition;
        private long currentPartitionRemainRecords;

        public PartitionedRecordsIterator(ChannelBackendMutableObjectIterator<T> recordIterator, List<PartitionIndex> partitionIndices) {
            this.recordIterator = recordIterator;
            this.partitionIndices = partitionIndices;
        }

        public Tuple2<Integer, T> next(Tuple2<Integer, T> reuse) throws IOException {
            Object rec = this.recordIterator.next(reuse.f1);
            if (rec == null) {
                return null;
            }
            reuse.f0 = this.currentPartition;
            --this.currentPartitionRemainRecords;
            while (this.currentPartitionRemainRecords == 0L) {
                ++this.currentPartition;
                this.currentPartitionRemainRecords = this.partitionIndices.get(this.currentPartition).getNumRecords();
            }
            return reuse;
        }

        public Tuple2<Integer, T> next() throws IOException {
            Object rec = this.recordIterator.next();
            if (rec == null) {
                return null;
            }
            Tuple2 result = new Tuple2((Object)this.currentPartition, rec);
            --this.currentPartitionRemainRecords;
            while (this.currentPartitionRemainRecords == 0L) {
                ++this.currentPartition;
                this.currentPartitionRemainRecords = this.partitionIndices.get(this.currentPartition).getNumRecords();
            }
            return result;
        }

        @Override
        public FileIOChannel getReaderChannel() {
            return this.recordIterator.getReaderChannel();
        }
    }
}

