/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.FixedLengthBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockResultPartitionMeta;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleServiceConfiguration;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionView;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionViewSchedulerDelegate;
import org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver;
import org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolverFactory;
import org.apache.flink.runtime.io.network.partition.external.OsCachePolicy;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.hadoop.io.ReadaheadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalBlockResultPartitionManager
implements ResultPartitionProvider {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBlockResultPartitionManager.class);
    private final ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration;
    private final LocalResultPartitionResolver resultPartitionResolver;
    @VisibleForTesting
    final Map<String, ThreadPoolExecutor> dirToThreadPool = new HashMap<String, ThreadPoolExecutor>();
    @VisibleForTesting
    final ConcurrentHashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> resultPartitionMetaMap = new ConcurrentHashMap();
    @VisibleForTesting
    final FixedLengthBufferPool bufferPool;
    private final ScheduledExecutorService resultPartitionRecyclerExecutorService;
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final ReadaheadPool readaheadPool;
    private final ScheduledExecutorService selfCheckExecutorService;

    public ExternalBlockResultPartitionManager(ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration) throws Exception {
        this.shuffleServiceConfiguration = shuffleServiceConfiguration;
        this.resultPartitionResolver = LocalResultPartitionResolverFactory.create(shuffleServiceConfiguration);
        this.bufferPool = new FixedLengthBufferPool(shuffleServiceConfiguration.getBufferNumber(), shuffleServiceConfiguration.getMemorySizePerBufferInBytes(), MemoryType.OFF_HEAP);
        this.constructThreadPools();
        this.readaheadPool = OsCachePolicy.READ_AHEAD.equals((Object)shuffleServiceConfiguration.getOsCachePolicy()) ? ReadaheadPool.getInstance() : null;
        DispatcherThreadFactory recyclerThreadFactory = new DispatcherThreadFactory(new ThreadGroup("FlinkShuffleService"), "ResultPartitionRecycler");
        this.resultPartitionRecyclerExecutorService = Executors.newSingleThreadScheduledExecutor(recyclerThreadFactory);
        this.resultPartitionRecyclerExecutorService.scheduleWithFixedDelay(() -> this.recycleResultPartitions(), 0L, shuffleServiceConfiguration.getDiskScanIntervalInMS(), TimeUnit.MILLISECONDS);
        DispatcherThreadFactory selfCheckThreadFactory = new DispatcherThreadFactory(new ThreadGroup("FlinkShuffleService"), "SelfCheckThread");
        this.selfCheckExecutorService = Executors.newSingleThreadScheduledExecutor(selfCheckThreadFactory);
        this.selfCheckExecutorService.scheduleAtFixedRate(new SelfCheckTask(), shuffleServiceConfiguration.getSelfCheckIntervalInMS(), shuffleServiceConfiguration.getSelfCheckIntervalInMS(), TimeUnit.MILLISECONDS);
        LOG.info("Final configurations: " + shuffleServiceConfiguration);
    }

    @Override
    public ResultSubpartitionView createSubpartitionView(ResultPartitionID resultPartitionId, int index, BufferAvailabilityListener availabilityListener) throws IOException {
        if (this.isStopped.get()) {
            throw new IOException("ExternalBlockResultPartitionManager has already been stopped.");
        }
        ExternalBlockResultPartitionMeta resultPartitionMeta = this.resultPartitionMetaMap.get(resultPartitionId);
        if (resultPartitionMeta == null) {
            LocalResultPartitionResolver.ResultPartitionFileInfo fileInfo = this.resultPartitionResolver.getResultPartitionDir(resultPartitionId);
            resultPartitionMeta = new ExternalBlockResultPartitionMeta(resultPartitionId, this.shuffleServiceConfiguration.getFileSystem(), fileInfo, this.shuffleServiceConfiguration.getOsCachePolicy(), this.shuffleServiceConfiguration.getMaxReadAheadLengthInBytes());
            ExternalBlockResultPartitionMeta prevResultPartitionMeta = this.resultPartitionMetaMap.putIfAbsent(resultPartitionId, resultPartitionMeta);
            if (prevResultPartitionMeta != null) {
                resultPartitionMeta = prevResultPartitionMeta;
            }
        }
        ExternalBlockSubpartitionView subpartitionView = new ExternalBlockSubpartitionView(resultPartitionMeta, index, this.dirToThreadPool.get(resultPartitionMeta.getRootDir()), resultPartitionId, this.bufferPool, this.shuffleServiceConfiguration.getWaitCreditDelay(), availabilityListener, this.readaheadPool);
        resultPartitionMeta.notifySubpartitionStartConsuming(index);
        return subpartitionView;
    }

    public void initializeApplication(String user, String appId) {
        this.resultPartitionResolver.initializeApplication(user, appId);
    }

    public void stopApplication(String appId) {
        Set<ResultPartitionID> resultPartitionIDS = this.resultPartitionResolver.stopApplication(appId);
        if (!resultPartitionIDS.isEmpty()) {
            resultPartitionIDS.forEach(resultPartitionID -> this.resultPartitionMetaMap.remove(resultPartitionID));
        }
    }

    public void stop() {
        LOG.warn("Stop ExternalBlockResultPartitionManager, probably ShuffleService is stopped");
        try {
            boolean succ = this.isStopped.compareAndSet(false, true);
            if (!succ) {
                LOG.info("ExternalBlockResultPartitionManager has already been stopped.");
                return;
            }
            for (Map.Entry<String, ThreadPoolExecutor> entry : this.dirToThreadPool.entrySet()) {
                entry.getValue().shutdownNow();
            }
            this.resultPartitionRecyclerExecutorService.shutdownNow();
            this.resultPartitionResolver.stop();
            this.bufferPool.lazyDestroy();
            this.resultPartitionMetaMap.clear();
        }
        catch (Throwable e) {
            LOG.error("Exception occurs when stopping ExternalBlockResultPartitionManager", e);
        }
    }

    private void constructThreadPools() {
        ThreadGroup threadGroup = new ThreadGroup("Disk IO Thread Group");
        this.shuffleServiceConfiguration.getDirToDiskType().forEach((dir, diskType) -> {
            Integer threadNum = this.shuffleServiceConfiguration.getDiskTypeToIOThreadNum().get(diskType);
            ExternalBlockSubpartitionViewSchedulerDelegate blockingQueue = new ExternalBlockSubpartitionViewSchedulerDelegate(this.shuffleServiceConfiguration.newSubpartitionViewScheduler());
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor((int)threadNum, (int)threadNum, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>)blockingQueue, new DispatcherThreadFactory(threadGroup, "IO thread [" + diskType + "] [" + dir + "]"));
            this.dirToThreadPool.put((String)dir, threadPool);
        });
    }

    @VisibleForTesting
    void recycleResultPartitions() {
        long currTime = System.currentTimeMillis();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Start to recycle result partitions, currTime: " + currTime);
        }
        HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> consumedPartitionsToRemove = new HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta>();
        HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> partialConsumedPartitionsToRemove = new HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta>();
        for (Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> partitionEntry : this.resultPartitionMetaMap.entrySet()) {
            long lastActiveTimeInMs;
            int refCnt;
            ResultPartitionID resultPartitionID = partitionEntry.getKey();
            ExternalBlockResultPartitionMeta resultPartitionMeta = partitionEntry.getValue();
            if (!resultPartitionMeta.hasInitialized() || (refCnt = resultPartitionMeta.getReferenceCount()) > 0) continue;
            int unconsumedSubpartitionCount = resultPartitionMeta.getUnconsumedSubpartitionCount();
            if (unconsumedSubpartitionCount <= 0) {
                lastActiveTimeInMs = resultPartitionMeta.getLastActiveTimeInMs();
                if (currTime - lastActiveTimeInMs <= resultPartitionMeta.getConsumedPartitionTTL()) continue;
                consumedPartitionsToRemove.put(resultPartitionID, resultPartitionMeta);
                continue;
            }
            lastActiveTimeInMs = resultPartitionMeta.getLastActiveTimeInMs();
            if (currTime - lastActiveTimeInMs <= resultPartitionMeta.getPartialConsumedPartitionTTL()) continue;
            partialConsumedPartitionsToRemove.put(resultPartitionID, resultPartitionMeta);
        }
        this.removeResultPartitionAndMeta(consumedPartitionsToRemove, "CONSUMED_PARTITION_TTL_TIMEOUT", LOG.isDebugEnabled());
        this.removeResultPartitionAndMeta(partialConsumedPartitionsToRemove, "PARTIAL_CONSUMED_PARTITION_TTL_TIMEOUT", true);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Finish recycling result partitions, cost " + (System.currentTimeMillis() - currTime) + " ms.");
        }
    }

    private void removeResultPartitionAndMeta(HashMap<ResultPartitionID, ExternalBlockResultPartitionMeta> partitionsToRemove, String recycleReason, boolean printLog) {
        if (partitionsToRemove.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta>> iterator = partitionsToRemove.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> entry = iterator.next();
            ResultPartitionID resultPartitionID = entry.getKey();
            ExternalBlockResultPartitionMeta meta = entry.getValue();
            if (meta.getReferenceCount() > 0) {
                iterator.remove();
                continue;
            }
            this.resultPartitionMetaMap.remove(resultPartitionID);
            this.resultPartitionResolver.recycleResultPartition(resultPartitionID);
            if (!printLog) continue;
            LOG.info("Delete partition's directory: {}, reason: {}, lastActiveTime: {}", new Object[]{meta.getResultPartitionDir(), recycleReason, meta.getLastActiveTimeInMs()});
        }
    }

    private final class SelfCheckTask
    implements Runnable {
        private static final String LOG_PREFIX = "FlinkShuffleService-SelfCheck-";
        private final int totalBufferNum;
        private int usedBufferNum = 0;
        private long lastCheckMemoryFootprintTimestamp;

        SelfCheckTask() {
            this.totalBufferNum = ExternalBlockResultPartitionManager.this.bufferPool.getNumBuffers();
            this.usedBufferNum = ExternalBlockResultPartitionManager.this.bufferPool.bestEffortGetNumOfUsedBuffers();
            this.lastCheckMemoryFootprintTimestamp = System.currentTimeMillis();
        }

        @Override
        public void run() {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Start to do self check, startTimeInMS: " + System.currentTimeMillis());
            }
            this.checkBufferUsage();
            this.shrinkResultPartitionMetaIfNecessary();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Finish self check, endTimeInMS: " + System.currentTimeMillis());
            }
        }

        private void checkBufferUsage() {
            StringBuilder stringBuilder = new StringBuilder("FlinkShuffleService-SelfCheck--BufferUsage");
            LogLevel logLevel = LogLevel.INFO;
            int tmpUsedBufferNum = ExternalBlockResultPartitionManager.this.bufferPool.bestEffortGetNumOfUsedBuffers();
            stringBuilder.append("TotalBufferNum: ").append(this.totalBufferNum);
            if (tmpUsedBufferNum > 0) {
                stringBuilder.append(", UsedBufferNum: ").append(tmpUsedBufferNum);
                if (this.usedBufferNum == this.totalBufferNum) {
                    logLevel = LogLevel.WARN;
                    stringBuilder.append(", BuffersHasBeenUsedUp");
                } else if (tmpUsedBufferNum == this.usedBufferNum) {
                    stringBuilder.append(", UsedBufferNumUnchanged");
                } else {
                    stringBuilder.append(", PreviousUsedBufferNum: ").append(this.usedBufferNum);
                }
            } else {
                stringBuilder.append(", TotalBuffersUnused");
                logLevel = LogLevel.DEBUG;
            }
            this.usedBufferNum = tmpUsedBufferNum;
            this.printReport(stringBuilder.toString(), logLevel);
        }

        private void shrinkResultPartitionMetaIfNecessary() {
            long currentTimestamp = System.currentTimeMillis();
            if (currentTimestamp - this.lastCheckMemoryFootprintTimestamp < ExternalBlockResultPartitionManager.this.shuffleServiceConfiguration.getMemoryShrinkageIntervalInMS()) {
                return;
            }
            this.lastCheckMemoryFootprintTimestamp = currentTimestamp;
            StringBuilder logBuilder = new StringBuilder(LOG_PREFIX).append("-MemoryGC ");
            long shrinkableMemoryFootprint = 0L;
            for (Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> partitionEntry : ExternalBlockResultPartitionManager.this.resultPartitionMetaMap.entrySet()) {
                shrinkableMemoryFootprint += partitionEntry.getValue().getShrinkableMemoryFootprint();
            }
            if (shrinkableMemoryFootprint <= ExternalBlockResultPartitionManager.this.shuffleServiceConfiguration.getHeapMemoryThresholdInBytes()) {
                logBuilder.append("SkipMemoryGC, detail: memory is sufficient").append(", heapMemoryThresholdInBytes: ").append(ExternalBlockResultPartitionManager.this.shuffleServiceConfiguration.getHeapMemoryThresholdInBytes()).append(", shrinkableHeapMemoryFootprint : ").append(shrinkableMemoryFootprint);
                this.printReport(logBuilder.toString(), LogLevel.DEBUG);
                return;
            }
            long expectedMemoryToShrink = shrinkableMemoryFootprint - ExternalBlockResultPartitionManager.this.shuffleServiceConfiguration.getHeapMemoryThresholdInBytes();
            long actualMemoryToShrink = 0L;
            boolean partitionLeftToBeShrunk = true;
            for (long expirationInterval = 86400000L; partitionLeftToBeShrunk && expirationInterval >= ExternalBlockResultPartitionManager.this.shuffleServiceConfiguration.getObjectMinIdleIntervalToShrinkInMS(); expirationInterval /= 2L) {
                partitionLeftToBeShrunk = false;
                for (Map.Entry<ResultPartitionID, ExternalBlockResultPartitionMeta> partitionEntry : ExternalBlockResultPartitionManager.this.resultPartitionMetaMap.entrySet()) {
                    ExternalBlockResultPartitionMeta meta = partitionEntry.getValue();
                    if (meta.getReferenceCount() < 1) continue;
                    if (currentTimestamp - meta.getLastActiveTimeInMs() > expirationInterval) {
                        if ((actualMemoryToShrink += meta.shrinkMemoryFootprint()) < expectedMemoryToShrink) continue;
                        break;
                    }
                    partitionLeftToBeShrunk = true;
                }
                if (actualMemoryToShrink >= expectedMemoryToShrink) break;
            }
            if (actualMemoryToShrink >= expectedMemoryToShrink) {
                logBuilder.append("MemoryGCSuccess");
            } else {
                logBuilder.append("MemoryGCFailure");
            }
            logBuilder.append(", shrinkableMemoryFootprint : ").append(shrinkableMemoryFootprint).append(" -> ").append(shrinkableMemoryFootprint - actualMemoryToShrink).append(", actualMemoryToShrink: ").append(actualMemoryToShrink).append(", expectedMemoryToShrink: ").append(expectedMemoryToShrink);
            this.printReport(logBuilder.toString(), LogLevel.WARN);
        }

        private void printReport(String log, LogLevel logLevel) {
            if (logLevel == LogLevel.WARN) {
                LOG.warn(log);
            } else if (logLevel == LogLevel.ERROR) {
                LOG.error(log);
            } else if (logLevel == LogLevel.INFO) {
                LOG.info(log);
            } else {
                LOG.debug(log);
            }
        }
    }

    private static enum LogLevel {
        DEBUG,
        INFO,
        WARN,
        ERROR;

    }
}

