/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.SerializerManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleServiceOptions;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.PartitionIndex;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionHashFileWriter;
import org.apache.flink.runtime.io.network.partition.external.writer.PartitionMergeFileWriter;
import org.apache.flink.runtime.io.network.partition.external.writer.PersistentFileWriter;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalResultPartition<T>
extends ResultPartition<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalResultPartition.class);
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final String partitionRootPath;
    private final int hashMaxSubpartitions;
    private final int mergeFactor;
    private final boolean enableAsyncMerging;
    private final boolean mergeToOneFile;
    private final double shuffleMemory;
    private final int numPages;
    private final SerializerManager<SerializationDelegate<T>> serializerManager;
    private final long consumedPartitionTTL;
    private final long partialConsumedPartitionTTL;
    private final long unconsumedPartitionTTL;
    private final long unfinishedPartitionTTL;
    private PersistentFileWriter<T> fileWriter;
    private volatile boolean initialized;

    public ExternalResultPartition(Configuration taskManagerConfiguration, String owningTaskName, JobID jobId, ResultPartitionID partitionId, ResultPartitionType partitionType, int numberOfSubpartitions, int numTargetKeyGroups, MemoryManager memoryManager, IOManager ioManager) {
        super(owningTaskName, jobId, partitionId, partitionType, numberOfSubpartitions, numTargetKeyGroups);
        Preconditions.checkNotNull((Object)taskManagerConfiguration);
        this.memoryManager = (MemoryManager)Preconditions.checkNotNull((Object)memoryManager);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.partitionRootPath = ExternalBlockShuffleUtils.generatePartitionRootPath(this.getSpillRootPath(taskManagerConfiguration, jobId.toString(), partitionId.toString()), partitionId.getProducerId().toString(), partitionId.getPartitionId().toString());
        this.hashMaxSubpartitions = taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_OUTPUT_HASH_MAX_SUBPARTITIONS);
        this.mergeFactor = taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_OUTPUT_MERGE_FACTOR);
        this.enableAsyncMerging = taskManagerConfiguration.getBoolean(TaskManagerOptions.TASK_MANAGER_OUTPUT_ENABLE_ASYNC_MERGE);
        this.mergeToOneFile = taskManagerConfiguration.getBoolean(TaskManagerOptions.TASK_MANAGER_OUTPUT_MERGE_TO_ONE_FILE);
        this.shuffleMemory = taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_OUTPUT_MEMORY_MB);
        this.numPages = (int)(this.shuffleMemory * 1024.0 * 1024.0 / (double)memoryManager.getPageSize());
        Preconditions.checkArgument((this.hashMaxSubpartitions > 0 ? 1 : 0) != 0, (Object)("The max allowed number of subpartitions should be larger than 0, but actually is: " + this.hashMaxSubpartitions));
        Preconditions.checkArgument((this.mergeFactor > 0 ? 1 : 0) != 0, (Object)("The merge factor should be larger than 0, but actually is: " + this.mergeFactor));
        Preconditions.checkArgument((this.shuffleMemory > 0.0 ? 1 : 0) != 0, (Object)("The shuffle memory should be larger than 0, but actually is: " + this.shuffleMemory));
        Preconditions.checkArgument((this.numPages > 0 ? 1 : 0) != 0, (Object)("The number of pages should be larger than 0, but actually is: " + this.numPages));
        this.serializerManager = new SerializerManager(ResultPartitionType.BLOCKING, taskManagerConfiguration);
        this.consumedPartitionTTL = taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        this.partialConsumedPartitionTTL = taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_PARTIAL_CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        this.unconsumedPartitionTTL = taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_UNCONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        this.unfinishedPartitionTTL = taskManagerConfiguration.getInteger(TaskManagerOptions.TASK_EXTERNAL_SHUFFLE_UNFINISHED_PARTITION_TTL_IN_SECONDS) * 1000;
    }

    private void initialize() {
        Preconditions.checkNotNull((Object)this.typeSerializer);
        Preconditions.checkNotNull((Object)this.parentTask);
        try {
            Path tmpPartitionRootPath = new Path(this.partitionRootPath);
            FileSystem fs = FileSystem.getLocalFileSystem();
            if (fs.exists(tmpPartitionRootPath)) {
                fs.delete(tmpPartitionRootPath, true);
            }
            int maxRetryCnt = 100;
            do {
                try {
                    fs.mkdirs(tmpPartitionRootPath);
                }
                catch (IOException e) {
                    if (maxRetryCnt-- > 0) {
                        LOG.error("Fail to create partition root path: " + this.partitionRootPath + ", left retry times: " + maxRetryCnt);
                        continue;
                    }
                    LOG.error("Reach retry limit, fail to create partition root path: " + this.partitionRootPath);
                    throw e;
                }
            } while (!fs.exists(tmpPartitionRootPath));
            this.writeConfigFile(fs);
            List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, this.numPages);
            this.fileWriter = this.numberOfSubpartitions <= this.hashMaxSubpartitions && this.numberOfSubpartitions <= memory.size() && !this.serializerManager.useCompression() ? new PartitionHashFileWriter(this.numberOfSubpartitions, this.partitionRootPath, memory, this.ioManager, this.typeSerializer, this.numBytesOut, this.numBuffersOut) : new PartitionMergeFileWriter<T>(this.numberOfSubpartitions, this.partitionRootPath, this.mergeFactor, this.enableAsyncMerging, this.mergeToOneFile, this.memoryManager, memory, this.ioManager, this.typeSerializer, this.serializerManager, this.parentTask, this.numBytesOut, this.numBuffersOut);
            this.initialized = true;
            LOG.info(this.toString() + " initialized successfully.");
        }
        catch (Throwable t) {
            this.deletePartitionDirOnFailure();
            throw new RuntimeException(t);
        }
    }

    @VisibleForTesting
    void writeConfigFile(FileSystem fileSystem) throws IOException {
        String configPath = ExternalBlockShuffleUtils.generateConfigPath(this.partitionRootPath);
        try (FSDataOutputStream configOut = fileSystem.create(new Path(configPath), FileSystem.WriteMode.OVERWRITE);){
            DataOutputViewStreamWrapper configView = new DataOutputViewStreamWrapper((OutputStream)configOut);
            configView.writeLong(this.consumedPartitionTTL);
            configView.writeLong(this.partialConsumedPartitionTTL);
            configView.writeLong(this.unconsumedPartitionTTL);
            configView.writeLong(this.unfinishedPartitionTTL);
        }
        catch (IOException e) {
            LOG.error("Write the config file " + configPath + " fail.", (Throwable)e);
            throw e;
        }
    }

    @Override
    public void emitRecord(T record, int[] targetChannels, boolean isBroadcast, boolean flushAlways) throws IOException, InterruptedException {
        if (!this.initialized) {
            this.initialize();
        }
        try {
            this.checkInProduceState();
            this.fileWriter.add(record, targetChannels);
        }
        catch (Throwable e) {
            this.deletePartitionDirOnFailure();
            throw e;
        }
    }

    @Override
    public void emitRecord(T record, int targetChannel, boolean isBroadcast, boolean flushAlways) throws IOException, InterruptedException {
        if (!this.initialized) {
            this.initialize();
        }
        try {
            this.checkInProduceState();
            this.fileWriter.add(record, targetChannel);
        }
        catch (Throwable e) {
            this.deletePartitionDirOnFailure();
            throw e;
        }
    }

    @Override
    public void broadcastEvent(AbstractEvent event, boolean flushAlways) throws IOException {
        throw new UnsupportedOperationException("Event is not supported in external result partition.");
    }

    @Override
    public void clearBuffers() {
    }

    @Override
    public void flushAll() {
    }

    @Override
    public void flush(int subpartitionIndex) {
    }

    @Override
    protected void releaseInternal() {
        try {
            if (this.fileWriter != null) {
                this.fileWriter.clear();
                this.fileWriter = null;
            }
        }
        catch (IOException e) {
            LOG.error("Fail to clear external shuffler", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void finish() throws IOException {
        try {
            if (!this.initialized) {
                this.initialize();
                LOG.warn("The result partition {} has no data before finish.", (Object)this.partitionId);
            }
            if (this.isReleased.get()) {
                LOG.warn("The result partition {} has been released already before finish.", (Object)this.partitionId);
                this.deletePartitionDirOnFailure();
                return;
            }
            this.checkInProduceState();
            if (!this.initialized) {
                this.initialize();
            }
            FileSystem fs = FileSystem.get((URI)new Path(this.partitionRootPath).toUri());
            this.fileWriter.finish();
            List<List<PartitionIndex>> indicesList = this.fileWriter.generatePartitionIndices();
            for (int i = 0; i < indicesList.size(); ++i) {
                String indexPath = ExternalBlockShuffleUtils.generateIndexPath(this.partitionRootPath, i);
                try (FSDataOutputStream indexOut = fs.create(new Path(indexPath), FileSystem.WriteMode.OVERWRITE);){
                    DataOutputViewStreamWrapper indexView = new DataOutputViewStreamWrapper((OutputStream)indexOut);
                    ExternalBlockShuffleUtils.serializeIndices(indicesList.get(i), (DataOutputView)indexView);
                    continue;
                }
            }
            String finishedPath = ExternalBlockShuffleUtils.generateFinishedPath(this.partitionRootPath);
            try (FSDataOutputStream finishedOut = fs.create(new Path(finishedPath), FileSystem.WriteMode.OVERWRITE);){
                DataOutputViewStreamWrapper finishedView = new DataOutputViewStreamWrapper((OutputStream)finishedOut);
                finishedView.writeInt(1);
                String externalFileType = this.fileWriter.getExternalFileType().name();
                finishedView.writeInt(externalFileType.length());
                finishedView.write(externalFileType.getBytes());
                finishedView.writeInt(indicesList.size());
                finishedView.writeInt(this.numberOfSubpartitions);
            }
        }
        catch (Throwable e) {
            this.deletePartitionDirOnFailure();
            ExceptionUtils.rethrow((Throwable)e);
        }
        finally {
            this.releaseInternal();
        }
        this.isFinished = true;
    }

    private void deletePartitionDirOnFailure() {
        FileSystem fileSystem = FileSystem.getLocalFileSystem();
        boolean deleteSuccess = false;
        try {
            deleteSuccess = fileSystem.delete(new Path(this.partitionRootPath), true);
        }
        catch (Throwable e) {
            LOG.error("Exception occurred on deletePartitionDirOnFailure.", e);
        }
        if (!deleteSuccess) {
            LOG.error("Failed to delete dirty data, directory path " + this.partitionRootPath);
        }
    }

    private String getSpillRootPath(Configuration configuration, String jobIdStr, String partitionIdStr) {
        String localShuffleDirs = configuration.getString(TaskManagerOptions.TASK_MANAGER_OUTPUT_LOCAL_OUTPUT_DIRS);
        if (localShuffleDirs.isEmpty()) {
            throw new IllegalStateException("The root dir for external result partition is not properly set. Please check " + ExternalBlockShuffleServiceOptions.LOCAL_DIRS + " in hadoop configuration.");
        }
        Object[] dirs = localShuffleDirs.split(",");
        Arrays.sort(dirs);
        int hashCode2 = ExternalBlockShuffleUtils.hashPartitionToDisk(jobIdStr, partitionIdStr);
        return dirs[hashCode2 % dirs.length];
    }

    @VisibleForTesting
    String getPartitionRootPath() {
        return this.partitionRootPath;
    }

    public String toString() {
        return "External Result Partition: {partitionId = " + this.partitionId + ", fileWriter = " + this.fileWriter.getClass().getName() + ", rootPath = " + this.partitionRootPath + ", numberOfSubpartitions = " + this.numberOfSubpartitions + ", hashMaxSubpartitions = " + this.hashMaxSubpartitions + ", mergeFactor = " + this.mergeFactor + ", shuffleMemory = " + this.shuffleMemory + ", numPages = " + this.numPages + ", enableAsyncMerging = " + this.enableAsyncMerging + ", mergeToOneFile = " + this.mergeToOneFile + ", consumedPartitionTTL" + this.consumedPartitionTTL + ", partialConsumedPartitionTTL" + this.partialConsumedPartitionTTL + ", unconsumedPartitionTTL" + this.unconsumedPartitionTTL + ", unfinishedPartitionTTL" + this.unfinishedPartitionTTL + "}";
    }
}

