/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleServiceConfiguration;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleUtils;
import org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnLocalResultPartitionResolver
extends LocalResultPartitionResolver {
    private static final Logger LOG = LoggerFactory.getLogger(YarnLocalResultPartitionResolver.class);
    private static final int RESULT_PARTITION_MAP_INITIAL_CAPACITY = 2000;
    private static final int APP_ID_MAP_INITIAL_CAPACITY = 100;
    private final FileSystem fileSystem;
    @VisibleForTesting
    protected final ConcurrentHashMap<String, String> appIdToUser = new ConcurrentHashMap(100);
    @VisibleForTesting
    protected final ConcurrentHashMap<ResultPartitionID, YarnResultPartitionFileInfo> resultPartitionMap = new ConcurrentHashMap(2000);
    private final ScheduledExecutorService diskScannerExecutorService;
    private long lastDiskScanTimestamp = -1L;
    private final String containerExecutorExecutablePath;
    private final Boolean containerLimitUsers;
    private final String nonsecureLocalUser;

    YarnLocalResultPartitionResolver(ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration) {
        super(shuffleServiceConfiguration);
        this.fileSystem = shuffleServiceConfiguration.getFileSystem();
        this.containerExecutorExecutablePath = YarnLocalResultPartitionResolver.parseContainerExecutorExecutablePath(shuffleServiceConfiguration, this.fileSystem);
        this.nonsecureLocalUser = shuffleServiceConfiguration.getConfiguration().getString("yarn.nodemanager.linux-container-executor.nonsecure-mode.local-user", "nobody");
        this.containerLimitUsers = shuffleServiceConfiguration.getConfiguration().getBoolean("yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-users", true);
        DispatcherThreadFactory diskScannerThreadFactory = new DispatcherThreadFactory(new ThreadGroup("FlinkShuffleService"), "DiskScanner");
        this.diskScannerExecutorService = Executors.newSingleThreadScheduledExecutor(diskScannerThreadFactory);
        this.diskScannerExecutorService.scheduleWithFixedDelay(() -> this.doDiskScan(), 0L, shuffleServiceConfiguration.getDiskScanIntervalInMS(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void initializeApplication(String user, String appId) {
        this.appIdToUser.putIfAbsent(appId, user);
    }

    @Override
    public Set<ResultPartitionID> stopApplication(String appId) {
        HashSet<ResultPartitionID> toRemove = new HashSet<ResultPartitionID>();
        Iterator<Map.Entry<ResultPartitionID, YarnResultPartitionFileInfo>> partitionIterator = this.resultPartitionMap.entrySet().iterator();
        while (partitionIterator.hasNext()) {
            Map.Entry<ResultPartitionID, YarnResultPartitionFileInfo> entry = partitionIterator.next();
            if (!entry.getValue().getAppId().equals(appId)) continue;
            toRemove.add(entry.getKey());
            partitionIterator.remove();
        }
        this.appIdToUser.remove(appId);
        return toRemove;
    }

    @Override
    public LocalResultPartitionResolver.ResultPartitionFileInfo getResultPartitionDir(ResultPartitionID resultPartitionID) throws IOException {
        YarnResultPartitionFileInfo fileInfo = this.resultPartitionMap.get(resultPartitionID);
        if (fileInfo != null) {
            if (!fileInfo.isReadyToBeConsumed()) {
                this.updateUnfinishedResultPartition(resultPartitionID, fileInfo);
            }
            fileInfo.updateOnConsumption();
            return fileInfo;
        }
        fileInfo = this.searchResultPartitionDir(resultPartitionID);
        if (fileInfo == null) {
            throw new PartitionNotFoundException(resultPartitionID);
        }
        return fileInfo;
    }

    @Override
    public void recycleResultPartition(ResultPartitionID resultPartitionID) {
        YarnResultPartitionFileInfo fileInfo = this.resultPartitionMap.get(resultPartitionID);
        if (fileInfo != null) {
            fileInfo.markToDelete();
        }
    }

    @Override
    public void stop() {
        LOG.warn("stop YarnLocalResultPartitionResolver.");
        try {
            this.diskScannerExecutorService.shutdownNow();
        }
        catch (Throwable e) {
            LOG.error("Exception occurs when stopping YarnLocalResultPartitionResolver", e);
        }
    }

    public static String generateRelativeLocalAppDir(String user, String appId) {
        return "usercache/" + user + "/appcache/" + appId;
    }

    private void updateUnfinishedResultPartition(ResultPartitionID resultPartitionID, YarnResultPartitionFileInfo fileInfo) throws IOException {
        String finishedFilePath = ExternalBlockShuffleUtils.generateFinishedPath((String)fileInfo.getRootDirAndPartitionDir().f1);
        try {
            FileStatus fileStatus = this.fileSystem.getFileStatus(new Path(finishedFilePath));
            if (fileStatus != null) {
                fileInfo.setReadyToBeConsumed(fileStatus.getModificationTime());
            }
        }
        catch (FileNotFoundException e) {
            throw new PartitionNotFoundException(resultPartitionID);
        }
    }

    private YarnResultPartitionFileInfo searchResultPartitionDir(ResultPartitionID resultPartitionID) throws IOException {
        for (Map.Entry<String, String> appIdAndUser : this.appIdToUser.entrySet()) {
            String appId = appIdAndUser.getKey();
            String user = appIdAndUser.getValue();
            String relativePartitionDir = ExternalBlockShuffleUtils.generatePartitionRootPath(YarnLocalResultPartitionResolver.generateRelativeLocalAppDir(user, appId), resultPartitionID.getProducerId().toString(), resultPartitionID.getPartitionId().toString());
            for (String rootDir : this.shuffleServiceConfiguration.getDirToDiskType().keySet()) {
                FileStatus fileStatus;
                FileStatus partitionDirStatus;
                String partitionDir = rootDir + relativePartitionDir;
                String finishedFilePath = ExternalBlockShuffleUtils.generateFinishedPath(partitionDir);
                try {
                    partitionDirStatus = this.fileSystem.getFileStatus(new Path(partitionDir));
                }
                catch (IOException e) {
                    partitionDirStatus = null;
                }
                if (partitionDirStatus == null || !partitionDirStatus.isDir()) continue;
                try {
                    fileStatus = this.fileSystem.getFileStatus(new Path(finishedFilePath));
                }
                catch (FileNotFoundException e) {
                    fileStatus = null;
                }
                catch (IOException e) {
                    throw e;
                }
                if (fileStatus != null) {
                    YarnResultPartitionFileInfo fileInfo = new YarnResultPartitionFileInfo(appId, (Tuple2<String, String>)new Tuple2((Object)rootDir, (Object)partitionDir), true, true, fileStatus.getModificationTime(), System.currentTimeMillis(), this.shuffleServiceConfiguration);
                    YarnResultPartitionFileInfo prevFileInfo = this.resultPartitionMap.putIfAbsent(resultPartitionID, fileInfo);
                    if (prevFileInfo != null) {
                        if (!prevFileInfo.isReadyToBeConsumed()) {
                            prevFileInfo.setReadyToBeConsumed(fileStatus.getModificationTime());
                        }
                        prevFileInfo.updateOnConsumption();
                        fileInfo = prevFileInfo;
                    }
                    return fileInfo;
                }
                YarnResultPartitionFileInfo fileInfo = new YarnResultPartitionFileInfo(appId, (Tuple2<String, String>)new Tuple2((Object)rootDir, (Object)partitionDir), false, false, -1L, System.currentTimeMillis(), this.shuffleServiceConfiguration);
                YarnResultPartitionFileInfo prevFileInfo = this.resultPartitionMap.putIfAbsent(resultPartitionID, fileInfo);
                if (prevFileInfo != null && prevFileInfo.isReadyToBeConsumed()) {
                    prevFileInfo.updateOnConsumption();
                    return prevFileInfo;
                }
                throw new PartitionNotFoundException(resultPartitionID);
            }
        }
        return null;
    }

    @VisibleForTesting
    void doDiskScan() {
        long currTime = System.currentTimeMillis();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Start to do disk scan, currTime: " + currTime);
        }
        Set<String> rootDirs = this.shuffleServiceConfiguration.getDirToDiskType().keySet();
        for (Map.Entry<String, String> userAndAppId : this.appIdToUser.entrySet()) {
            String user = userAndAppId.getValue();
            String appId = userAndAppId.getKey();
            String relativeAppDir = YarnLocalResultPartitionResolver.generateRelativeLocalAppDir(user, appId);
            for (String rootDir : rootDirs) {
                String appDir = rootDir + relativeAppDir + "/";
                FileStatus[] appDirStatuses = null;
                try {
                    appDirStatuses = this.fileSystem.listStatus(new Path(appDir));
                }
                catch (Exception e) {
                    continue;
                }
                if (appDirStatuses == null) continue;
                for (FileStatus partitionDirStatus : appDirStatuses) {
                    ResultPartitionID resultPartitionID;
                    if (!partitionDirStatus.isDir() || (resultPartitionID = ExternalBlockShuffleUtils.convertRelativeDirToResultPartitionID(partitionDirStatus.getPath().getName())) == null) continue;
                    this.updateResultPartitionFileInfoByFileStatus(currTime, appId, resultPartitionID, partitionDirStatus, rootDir, appDir);
                }
            }
        }
        Iterator<Map.Entry<ResultPartitionID, YarnResultPartitionFileInfo>> partitionIterator = this.resultPartitionMap.entrySet().iterator();
        while (partitionIterator.hasNext()) {
            long lastActiveTime;
            Map.Entry<ResultPartitionID, YarnResultPartitionFileInfo> entry = partitionIterator.next();
            YarnResultPartitionFileInfo fileInfo = entry.getValue();
            boolean needToRemoveFileInfo = false;
            if (fileInfo.needToDelete()) {
                needToRemoveFileInfo = true;
                this.removeResultPartition(new Path((String)((YarnResultPartitionFileInfo)fileInfo).rootDirAndPartitionDir.f1), "FETCHED_PARTITION_TTL_TIMEOUT", -1L, false);
            } else if (fileInfo.getFileInfoTimestamp() <= this.lastDiskScanTimestamp) {
                needToRemoveFileInfo = true;
            } else if (fileInfo.isReadyToBeConsumed() && !fileInfo.isConsumed() && currTime - (lastActiveTime = fileInfo.getPartitionReadyTime()) > fileInfo.getUnconsumedPartitionTTL()) {
                needToRemoveFileInfo = true;
                this.removeResultPartition(new Path((String)((YarnResultPartitionFileInfo)fileInfo).rootDirAndPartitionDir.f1), "UNCONSUMED_PARTITION_TTL_TIMEOUT", lastActiveTime, true);
            }
            if (!needToRemoveFileInfo) continue;
            partitionIterator.remove();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Finish disk scan, cost " + (System.currentTimeMillis() - currTime) + " ms.");
        }
        this.lastDiskScanTimestamp = currTime;
    }

    private void updateResultPartitionFileInfoByFileStatus(long currTime, String appId, ResultPartitionID resultPartitionID, FileStatus partitionDirStatus, String rootDir, String appDir) {
        YarnResultPartitionFileInfo fileInfo = this.resultPartitionMap.get(resultPartitionID);
        if (fileInfo != null) {
            fileInfo.updateFileInfoTimestamp(currTime);
            if (!fileInfo.configLoaded) {
                this.tryUpdateTTLByConfigFile(fileInfo);
            }
            if (fileInfo.isReadyToBeConsumed()) {
                return;
            }
        }
        FileStatus finishFileStatus = null;
        String partitionDir = appDir + partitionDirStatus.getPath().getName() + "/";
        String finishFilePath = ExternalBlockShuffleUtils.generateFinishedPath(partitionDir);
        try {
            finishFileStatus = this.fileSystem.getFileStatus(new Path(finishFilePath));
        }
        catch (Exception e) {
            finishFileStatus = null;
        }
        if (finishFileStatus != null) {
            YarnResultPartitionFileInfo prevFileInfo;
            if (fileInfo == null && (prevFileInfo = this.resultPartitionMap.putIfAbsent(resultPartitionID, fileInfo = new YarnResultPartitionFileInfo(appId, (Tuple2<String, String>)new Tuple2((Object)rootDir, (Object)partitionDir), true, false, finishFileStatus.getModificationTime(), currTime, this.shuffleServiceConfiguration))) != null) {
                fileInfo = prevFileInfo;
            }
            if (!fileInfo.isConfigLoaded()) {
                this.tryUpdateTTLByConfigFile(fileInfo);
            }
            if (!fileInfo.isReadyToBeConsumed()) {
                fileInfo.setReadyToBeConsumed(finishFileStatus.getModificationTime());
            }
        } else {
            long lastActiveTime;
            YarnResultPartitionFileInfo prevFileInfo;
            if (fileInfo == null && (prevFileInfo = this.resultPartitionMap.putIfAbsent(resultPartitionID, fileInfo = new YarnResultPartitionFileInfo(appId, (Tuple2<String, String>)new Tuple2((Object)rootDir, (Object)partitionDir), false, false, -1L, currTime, this.shuffleServiceConfiguration))) != null) {
                fileInfo = prevFileInfo;
            }
            if (!fileInfo.isConfigLoaded()) {
                this.tryUpdateTTLByConfigFile(fileInfo);
            }
            if (!fileInfo.isReadyToBeConsumed() && currTime - (lastActiveTime = partitionDirStatus.getModificationTime()) > fileInfo.getUnfinishedPartitionTTL()) {
                this.removeResultPartition(partitionDirStatus.getPath(), "UNFINISHED_PARTITION_TTL_TIMEOUT", lastActiveTime, true);
                this.resultPartitionMap.remove(resultPartitionID);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryUpdateTTLByConfigFile(YarnResultPartitionFileInfo fileInfo) {
        FSDataInputStream configIn = null;
        String configFilePathStr = null;
        try {
            configFilePathStr = ExternalBlockShuffleUtils.generateConfigPath((String)fileInfo.getRootDirAndPartitionDir().f1);
            Path configFilePath = new Path(configFilePathStr);
            if (!this.fileSystem.exists(configFilePath)) {
                return;
            }
            configIn = this.fileSystem.open(new Path(configFilePathStr));
            if (configIn != null) {
                DataInputViewStreamWrapper configView = new DataInputViewStreamWrapper((InputStream)configIn);
                long consumedPartitionTTL = configView.readLong();
                long partialConsumedPartitionTTL = configView.readLong();
                long unconsumedPartitionTTL = configView.readLong();
                long unfinishedPartitionTTL = configView.readLong();
                fileInfo.updateTTLConfig(consumedPartitionTTL, partialConsumedPartitionTTL, unconsumedPartitionTTL, unfinishedPartitionTTL);
            }
        }
        catch (IOException e) {
        }
        finally {
            if (configIn != null) {
                try {
                    configIn.close();
                }
                catch (IOException e) {
                    LOG.warn("Exception throws when trying to close the config file " + configFilePathStr, (Throwable)e);
                }
            }
        }
    }

    @VisibleForTesting
    static String parseContainerExecutorExecutablePath(ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration, FileSystem fileSystem) {
        String yarnHomeEnvVar = System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
        File hadoopBin = new File(yarnHomeEnvVar, "bin");
        String defaultContainerExecutorPath = new File(hadoopBin, "container-executor").getAbsolutePath();
        String containerExecutorPath = shuffleServiceConfiguration.getConfiguration().getString("yarn.nodemanager.linux-container-executor.path", defaultContainerExecutorPath);
        try {
            if (fileSystem.exists(new Path(containerExecutorPath))) {
                LOG.info("Use container-executor to do recycling, file path: " + containerExecutorPath);
                return containerExecutorPath;
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
        throw new IllegalArgumentException("Invalid container-executor configuration: " + containerExecutorPath);
    }

    @VisibleForTesting
    void removeResultPartition(Path partitionDir, String recycleReason, long lastActiveTime, boolean printLog) {
        try {
            String user = this.getRunAsUser(partitionDir.getParent().getParent().getParent().getName());
            Process process = Runtime.getRuntime().exec(new String[]{this.containerExecutorExecutablePath, user, user, "3", partitionDir.toString()});
            boolean processExit = process.waitFor(30L, TimeUnit.SECONDS);
            if (processExit) {
                int exitCode = process.exitValue();
                if (exitCode != 0) {
                    LOG.warn("Fail to delete partition's directory: {}, user: {}, reason: {}, lastActiveTime: {}, exitCode: {}.", new Object[]{partitionDir, user, recycleReason, lastActiveTime, exitCode});
                } else if (printLog) {
                    LOG.info("Delete partition's directory: {}, user: {}, reason: {}, lastActiveTime: {}.", new Object[]{partitionDir, user, recycleReason, lastActiveTime});
                }
            } else {
                LOG.warn("Delete partition's directory for more than 30 seconds: {}, user: {}, reason: {}, lastActiveTime: {}.", new Object[]{partitionDir, user, recycleReason, lastActiveTime});
            }
        }
        catch (Exception e) {
            LOG.warn("Fail to delete partition's directory: {}, exception:", (Object)partitionDir, (Object)e);
        }
    }

    private String getRunAsUser(String user) {
        if (UserGroupInformation.isSecurityEnabled() || !this.containerLimitUsers.booleanValue()) {
            return user;
        }
        return this.nonsecureLocalUser;
    }

    @VisibleForTesting
    static class YarnResultPartitionFileInfo
    implements LocalResultPartitionResolver.ResultPartitionFileInfo {
        private final String appId;
        private final Tuple2<String, String> rootDirAndPartitionDir;
        private volatile boolean readyToBeConsumed;
        private volatile boolean consumed;
        private volatile long partitionReadyTime;
        private volatile long fileInfoTimestamp;
        private volatile long consumedPartitionTTL;
        private volatile long partialConsumedPartitionTTL;
        private volatile long unconsumedPartitionTTL;
        private volatile long unfinishedPartitionTTL;
        private boolean configLoaded;

        YarnResultPartitionFileInfo(String appId, Tuple2<String, String> rootDirAndPartitionDir, boolean readyToBeConsumed, boolean consumed, long partitionReadyTime, long fileInfoTimestamp, ExternalBlockShuffleServiceConfiguration shuffleServiceConfiguration) {
            this.appId = appId;
            this.rootDirAndPartitionDir = rootDirAndPartitionDir;
            this.readyToBeConsumed = readyToBeConsumed;
            this.consumed = consumed;
            this.partitionReadyTime = partitionReadyTime;
            this.fileInfoTimestamp = fileInfoTimestamp;
            this.consumedPartitionTTL = shuffleServiceConfiguration.getDefaultConsumedPartitionTTL();
            this.partialConsumedPartitionTTL = shuffleServiceConfiguration.getDefaultPartialConsumedPartitionTTL();
            this.unconsumedPartitionTTL = shuffleServiceConfiguration.getDefaultUnconsumedPartitionTTL();
            this.unfinishedPartitionTTL = shuffleServiceConfiguration.getDefaultUnfinishedPartitionTTL();
        }

        String getAppId() {
            return this.appId;
        }

        Tuple2<String, String> getRootDirAndPartitionDir() {
            return this.rootDirAndPartitionDir;
        }

        boolean isReadyToBeConsumed() {
            return this.readyToBeConsumed;
        }

        boolean isConsumed() {
            return this.consumed;
        }

        long getPartitionReadyTime() {
            return this.partitionReadyTime;
        }

        long getFileInfoTimestamp() {
            return this.fileInfoTimestamp;
        }

        long getUnconsumedPartitionTTL() {
            return this.unconsumedPartitionTTL;
        }

        long getUnfinishedPartitionTTL() {
            return this.unfinishedPartitionTTL;
        }

        boolean isConfigLoaded() {
            return this.configLoaded;
        }

        @Override
        public String getRootDir() {
            return (String)this.rootDirAndPartitionDir.f0;
        }

        @Override
        public String getPartitionDir() {
            return (String)this.rootDirAndPartitionDir.f1;
        }

        @Override
        public long getConsumedPartitionTTL() {
            return this.consumedPartitionTTL;
        }

        @Override
        public long getPartialConsumedPartitionTTL() {
            return this.partialConsumedPartitionTTL;
        }

        void updateTTLConfig(long consumedPartitionTTL, long partialConsumedPartitionTTL, long unconsumedPartitionTTL, long unfinishedPartitionTTL) {
            this.consumedPartitionTTL = consumedPartitionTTL;
            this.partialConsumedPartitionTTL = partialConsumedPartitionTTL;
            this.unconsumedPartitionTTL = unconsumedPartitionTTL;
            this.unfinishedPartitionTTL = unfinishedPartitionTTL;
            this.configLoaded = true;
        }

        void setReadyToBeConsumed(long partitionReadyTime) {
            this.partitionReadyTime = partitionReadyTime;
            this.readyToBeConsumed = true;
        }

        void setConsumed() {
            this.consumed = true;
        }

        void updateFileInfoTimestamp(long fileInfoTimestamp) {
            if (this.fileInfoTimestamp > 0L) {
                this.fileInfoTimestamp = fileInfoTimestamp;
            }
        }

        void markToDelete() {
            this.fileInfoTimestamp = -1L;
        }

        void cancelDeletion() {
            this.fileInfoTimestamp = System.currentTimeMillis();
        }

        boolean needToDelete() {
            return this.fileInfoTimestamp <= 0L;
        }

        void updateOnConsumption() {
            if (this.needToDelete()) {
                this.cancelDeletion();
            }
            if (!this.isConsumed()) {
                this.setConsumed();
            }
        }
    }
}

