/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockResultPartitionMeta;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleServiceConfiguration;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionView;
import org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver;
import org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolverFactory;
import org.apache.flink.runtime.io.network.partition.external.OsCachePolicy;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.hadoop.io.ReadaheadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalBlockResultPartitionManager
implements ResultPartitionProvider {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBlockResultPartitionManager.class);
    private final ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration;
    private final LocalResultPartitionResolver resultPartitionResolver;
    @VisibleForTesting
    final Map<String, ThreadPoolExecutor> dirToThreadPool = new HashMap<String, ThreadPoolExecutor>();
    @VisibleForTesting
    final ConcurrentHashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> resultPartitionMetaMap = new ConcurrentHashMap();
    @VisibleForTesting
    final FixedLengthBufferPool bufferPool;
    private final ScheduledExecutorService resultPartitionRecyclerExecutorService;
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final ReadaheadPool readaheadPool;

    public ExternalBlockResultPartitionManager(ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration) throws Exception {
        this.shuffleServiceConfiguration = shuffleServiceConfiguration;
        this.resultPartitionResolver = LocalResultPartitionResolverFactory.create(shuffleServiceConfiguration);
        this.bufferPool = new FixedLengthBufferPool(shuffleServiceConfiguration.getBufferNumber(), shuffleServiceConfiguration.getMemorySizePerBufferInBytes(), MemoryType.OFF_HEAP);
        this.constructThreadPools();
        this.readaheadPool = OsCachePolicy.READ_AHEAD.equals((Object)shuffleServiceConfiguration.getOsCachePolicy()) ? ReadaheadPool.getInstance() : null;
        this.resultPartitionRecyclerExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.resultPartitionRecyclerExecutorService.scheduleWithFixedDelay(() -> this.recycleResultPartitions(), 0L, shuffleServiceConfiguration.getDiskScanIntervalInMS(), TimeUnit.MILLISECONDS);
        LOG.info("Final configurations: " + shuffleServiceConfiguration);
    }

    @Override
    public ResultSubpartitionView createSubpartitionView(ResultPartitionID resultPartitionId, int index, BufferAvailabilityListener availabilityListener) throws IOException {
        if (this.isStopped.get()) {
            throw new IOException("ExternalBlockResultPartitionManager has already been stopped.");
        }
        ExternalBlockResultPartitionMeta resultPartitionMeta = this.resultPartitionMetaMap.get(resultPartitionId);
        if (resultPartitionMeta == null) {
            LocalResultPartitionResolver.ResultPartitionFileInfo fileInfo = this.resultPartitionResolver.getResultPartitionDir(resultPartitionId);
            resultPartitionMeta = new ExternalBlockResultPartitionMeta(resultPartitionId, this.shuffleServiceConfiguration.getFileSystem(), fileInfo, this.shuffleServiceConfiguration.getOsCachePolicy(), this.shuffleServiceConfiguration.getMaxReadAheadLengthInBytes());
            ExternalBlockResultPartitionMeta prevResultPartitionMeta = this.resultPartitionMetaMap.putIfAbsent(resultPartitionId, resultPartitionMeta);
            if (prevResultPartitionMeta != null) {
                resultPartitionMeta = prevResultPartitionMeta;
            }
        }
        ExternalBlockSubpartitionView subpartitionView = new ExternalBlockSubpartitionView(resultPartitionMeta, index, this.dirToThreadPool.get(resultPartitionMeta.getRootDir()), resultPartitionId, this.bufferPool, this.shuffleServiceConfiguration.getWaitCreditDelay(), availabilityListener, this.readaheadPool);
        resultPartitionMeta.notifySubpartitionStartConsuming(index);
        return subpartitionView;
    }

    public void initializeApplication(String user, String appId) {
        this.resultPartitionResolver.initializeApplication(user, appId);
    }

    public void stopApplication(String appId) {
        Set<ResultPartitionID> resultPartitionIDS = this.resultPartitionResolver.stopApplication(appId);
        if (!resultPartitionIDS.isEmpty()) {
            resultPartitionIDS.forEach(resultPartitionID -> this.resultPartitionMetaMap.remove(resultPartitionID));
        }
    }

    public void stop() {
        LOG.warn("Stop ExternalBlockResultPartitionManager, probably ShuffleService is stopped");
        try {
            boolean succ = this.isStopped.compareAndSet(false, true);
            if (!succ) {
                LOG.info("ExternalBlockResultPartitionManager has already been stopped.");
                return;
            }
            for (Map.Entry<String, ThreadPoolExecutor> entry : this.dirToThreadPool.entrySet()) {
                entry.getValue().shutdownNow();
            }
            this.resultPartitionRecyclerExecutorService.shutdownNow();
            this.resultPartitionResolver.stop();
            this.bufferPool.lazyDestroy();
            this.resultPartitionMetaMap.clear();
        }
        catch (Throwable e) {
            LOG.error("Exception occurs when stopping ExternalBlockResultPartitionManager", e);
        }
    }

    private void constructThreadPools() {
        ThreadGroup threadGroup = new ThreadGroup("Disk IO Thread Group");
        this.shuffleServiceConfiguration.getDirToDiskType().forEach((dir, diskType) -> {
            Integer threadNum = this.shuffleServiceConfiguration.getDiskTypeToIOThreadNum().get(diskType);
            Comparator subpartitionViewComparator = this.shuffleServiceConfiguration.newSubpartitionViewComparator();
            PriorityBlockingQueue<Runnable> blockingQueue = subpartitionViewComparator != null ? new PriorityBlockingQueue(200, subpartitionViewComparator) : new LinkedBlockingQueue();
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor((int)threadNum, (int)threadNum, 0L, TimeUnit.MILLISECONDS, blockingQueue, new DispatcherThreadFactory(threadGroup, "IO thread [" + diskType + "] [" + dir + "]"));
            this.dirToThreadPool.put((String)dir, threadPool);
        });
    }

    @VisibleForTesting
    void recycleResultPartitions() {
        long currTime = System.currentTimeMillis();
        HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> consumedPartitionsToRemove = new HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta>();
        HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> partialConsumedPartitionsToRemove = new HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta>();
        for (Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> partitionEntry : this.resultPartitionMetaMap.entrySet()) {
            long lastActiveTimeInMs;
            int refCnt;
            ResultPartitionID resultPartitionID = partitionEntry.getKey();
            ExternalBlockResultPartitionMeta resultPartitionMeta = partitionEntry.getValue();
            if (!resultPartitionMeta.hasInitialized() || (refCnt = resultPartitionMeta.getReferenceCount()) > 0) continue;
            int unconsumedSubpartitionCount = resultPartitionMeta.getUnconsumedSubpartitionCount();
            if (unconsumedSubpartitionCount <= 0) {
                lastActiveTimeInMs = resultPartitionMeta.getLastActiveTimeInMs();
                if (currTime - lastActiveTimeInMs <= resultPartitionMeta.getConsumedPartitionTTL()) continue;
                consumedPartitionsToRemove.put(resultPartitionID, resultPartitionMeta);
                continue;
            }
            lastActiveTimeInMs = resultPartitionMeta.getLastActiveTimeInMs();
            if (currTime - lastActiveTimeInMs <= resultPartitionMeta.getPartialConsumedPartitionTTL()) continue;
            partialConsumedPartitionsToRemove.put(resultPartitionID, resultPartitionMeta);
        }
        this.removeResultPartitionAndMeta(consumedPartitionsToRemove, "CONSUMED_PARTITION_TTL_TIMEOUT", LOG.isDebugEnabled());
        this.removeResultPartitionAndMeta(partialConsumedPartitionsToRemove, "PARTIAL_CONSUMED_PARTITION_TTL_TIMEOUT", true);
    }

    private void removeResultPartitionAndMeta(HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> partitionsToRemove, String recycleReason, boolean printLog) {
        if (partitionsToRemove.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta>> iterator = partitionsToRemove.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> entry = iterator.next();
            ResultPartitionID resultPartitionID = entry.getKey();
            ExternalBlockResultPartitionMeta meta = entry.getValue();
            if (meta.getReferenceCount() > 0) {
                iterator.remove();
                continue;
            }
            this.resultPartitionMetaMap.remove(resultPartitionID);
            this.resultPartitionResolver.recycleResultPartition(resultPartitionID);
            if (!printLog) continue;
            LOG.info("Delete partition's directory: {}, reason: {}, lastActiveTime: {}", new Object[]{meta.getResultPartitionDir(), recycleReason, meta.getLastActiveTimeInMs()});
        }
    }
}

