package org.apache.flink.runtime.io.network.partition.external;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/YarnLocalResultPartitionResolver.class */
public class YarnLocalResultPartitionResolver extends LocalResultPartitionResolver {
    private static final Logger LOG = LoggerFactory.getLogger(YarnLocalResultPartitionResolver.class);
    private static final int RESULT_PARTITION_MAP_INITIAL_CAPACITY = 2000;
    private static final int APP_ID_MAP_INITIAL_CAPACITY = 100;
    private final FileSystem fileSystem;

    @VisibleForTesting
    protected final ConcurrentHashMap<String, String> appIdToUser;

    @VisibleForTesting
    protected final ConcurrentHashMap<ResultPartitionID, YarnResultPartitionFileInfo> resultPartitionMap;
    private final ScheduledExecutorService diskScannerExecutorService;
    private long lastDiskScanTimestamp;
    private final String containerExecutorExecutablePath;
    private final Boolean containerLimitUsers;
    private final String nonsecureLocalUser;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/YarnLocalResultPartitionResolver$YarnResultPartitionFileInfo.class */
    public static class YarnResultPartitionFileInfo implements LocalResultPartitionResolver.ResultPartitionFileInfo {
        private final String appId;
        private final Tuple2<String, String> rootDirAndPartitionDir;
        private volatile boolean readyToBeConsumed;
        private volatile boolean consumed;
        private volatile long partitionReadyTime;
        private volatile long fileInfoTimestamp;
        private volatile long consumedPartitionTTL;
        private volatile long partialConsumedPartitionTTL;
        private volatile long unconsumedPartitionTTL;
        private volatile long unfinishedPartitionTTL;
        private boolean configLoaded;

        YarnResultPartitionFileInfo(String str, Tuple2<String, String> tuple2, boolean z, boolean z2, long j, long j2, ExternalBlockShuffleServiceConfiguration externalBlockShuffleServiceConfiguration) {
            this.appId = str;
            this.rootDirAndPartitionDir = tuple2;
            this.readyToBeConsumed = z;
            this.consumed = z2;
            this.partitionReadyTime = j;
            this.fileInfoTimestamp = j2;
            this.consumedPartitionTTL = externalBlockShuffleServiceConfiguration.getDefaultConsumedPartitionTTL().longValue();
            this.partialConsumedPartitionTTL = externalBlockShuffleServiceConfiguration.getDefaultPartialConsumedPartitionTTL().longValue();
            this.unconsumedPartitionTTL = externalBlockShuffleServiceConfiguration.getDefaultUnconsumedPartitionTTL().longValue();
            this.unfinishedPartitionTTL = externalBlockShuffleServiceConfiguration.getDefaultUnfinishedPartitionTTL().longValue();
        }

        String getAppId() {
            return this.appId;
        }

        Tuple2<String, String> getRootDirAndPartitionDir() {
            return this.rootDirAndPartitionDir;
        }

        boolean isReadyToBeConsumed() {
            return this.readyToBeConsumed;
        }

        boolean isConsumed() {
            return this.consumed;
        }

        long getPartitionReadyTime() {
            return this.partitionReadyTime;
        }

        long getFileInfoTimestamp() {
            return this.fileInfoTimestamp;
        }

        long getUnconsumedPartitionTTL() {
            return this.unconsumedPartitionTTL;
        }

        long getUnfinishedPartitionTTL() {
            return this.unfinishedPartitionTTL;
        }

        boolean isConfigLoaded() {
            return this.configLoaded;
        }

        @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver.ResultPartitionFileInfo
        public String getRootDir() {
            return (String) this.rootDirAndPartitionDir.f0;
        }

        @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver.ResultPartitionFileInfo
        public String getPartitionDir() {
            return (String) this.rootDirAndPartitionDir.f1;
        }

        @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver.ResultPartitionFileInfo
        public long getConsumedPartitionTTL() {
            return this.consumedPartitionTTL;
        }

        @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver.ResultPartitionFileInfo
        public long getPartialConsumedPartitionTTL() {
            return this.partialConsumedPartitionTTL;
        }

        void updateTTLConfig(long j, long j2, long j3, long j4) {
            this.consumedPartitionTTL = j;
            this.partialConsumedPartitionTTL = j2;
            this.unconsumedPartitionTTL = j3;
            this.unfinishedPartitionTTL = j4;
            this.configLoaded = true;
        }

        void setReadyToBeConsumed(long j) {
            this.partitionReadyTime = j;
            this.readyToBeConsumed = true;
        }

        void setConsumed() {
            this.consumed = true;
        }

        void updateFileInfoTimestamp(long j) {
            if (this.fileInfoTimestamp > 0) {
                this.fileInfoTimestamp = j;
            }
        }

        void markToDelete() {
            this.fileInfoTimestamp = -1L;
        }

        void cancelDeletion() {
            this.fileInfoTimestamp = System.currentTimeMillis();
        }

        boolean needToDelete() {
            return this.fileInfoTimestamp <= 0;
        }

        void updateOnConsumption() {
            if (needToDelete()) {
                cancelDeletion();
            }
            if (isConsumed()) {
                return;
            }
            setConsumed();
        }
    }

    YarnLocalResultPartitionResolver(ExternalBlockShuffleServiceConfiguration externalBlockShuffleServiceConfiguration) {
        super(externalBlockShuffleServiceConfiguration);
        this.appIdToUser = new ConcurrentHashMap<>(100);
        this.resultPartitionMap = new ConcurrentHashMap<>(RESULT_PARTITION_MAP_INITIAL_CAPACITY);
        this.lastDiskScanTimestamp = -1L;
        this.fileSystem = externalBlockShuffleServiceConfiguration.getFileSystem();
        this.containerExecutorExecutablePath = parseContainerExecutorExecutablePath(externalBlockShuffleServiceConfiguration, this.fileSystem);
        this.nonsecureLocalUser = externalBlockShuffleServiceConfiguration.getConfiguration().getString("yarn.nodemanager.linux-container-executor.nonsecure-mode.local-user", "nobody");
        this.containerLimitUsers = Boolean.valueOf(externalBlockShuffleServiceConfiguration.getConfiguration().getBoolean("yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-users", true));
        this.diskScannerExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.diskScannerExecutorService.scheduleWithFixedDelay(() -> {
            doDiskScan();
        }, 0L, externalBlockShuffleServiceConfiguration.getDiskScanIntervalInMS().longValue(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver
    public void initializeApplication(String str, String str2) {
        this.appIdToUser.putIfAbsent(str2, str);
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver
    public Set<ResultPartitionID> stopApplication(String str) {
        HashSet hashSet = new HashSet();
        Iterator<Map.Entry<ResultPartitionID, YarnResultPartitionFileInfo>> it = this.resultPartitionMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<ResultPartitionID, YarnResultPartitionFileInfo> next = it.next();
            if (next.getValue().getAppId().equals(str)) {
                hashSet.add(next.getKey());
                it.remove();
            }
        }
        this.appIdToUser.remove(str);
        return hashSet;
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver
    public LocalResultPartitionResolver.ResultPartitionFileInfo getResultPartitionDir(ResultPartitionID resultPartitionID) throws IOException {
        YarnResultPartitionFileInfo yarnResultPartitionFileInfo = this.resultPartitionMap.get(resultPartitionID);
        if (yarnResultPartitionFileInfo != null) {
            if (!yarnResultPartitionFileInfo.isReadyToBeConsumed()) {
                updateUnfinishedResultPartition(resultPartitionID, yarnResultPartitionFileInfo);
            }
            yarnResultPartitionFileInfo.updateOnConsumption();
            return yarnResultPartitionFileInfo;
        }
        YarnResultPartitionFileInfo searchResultPartitionDir = searchResultPartitionDir(resultPartitionID);
        if (searchResultPartitionDir == null) {
            throw new PartitionNotFoundException(resultPartitionID);
        }
        return searchResultPartitionDir;
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver
    public void recycleResultPartition(ResultPartitionID resultPartitionID) {
        YarnResultPartitionFileInfo yarnResultPartitionFileInfo = this.resultPartitionMap.get(resultPartitionID);
        if (yarnResultPartitionFileInfo != null) {
            yarnResultPartitionFileInfo.markToDelete();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.external.LocalResultPartitionResolver
    public void stop() {
        LOG.warn("stop YarnLocalResultPartitionResolver.");
        try {
            this.diskScannerExecutorService.shutdownNow();
        } catch (Throwable th) {
            LOG.error("Exception occurs when stopping YarnLocalResultPartitionResolver", th);
        }
    }

    public static String generateRelativeLocalAppDir(String str, String str2) {
        return "usercache/" + str + "/appcache/" + str2;
    }

    private void updateUnfinishedResultPartition(ResultPartitionID resultPartitionID, YarnResultPartitionFileInfo yarnResultPartitionFileInfo) throws IOException {
        try {
            FileStatus fileStatus = this.fileSystem.getFileStatus(new Path(ExternalBlockShuffleUtils.generateFinishedPath((String) yarnResultPartitionFileInfo.getRootDirAndPartitionDir().f1)));
            if (fileStatus != null) {
                yarnResultPartitionFileInfo.setReadyToBeConsumed(fileStatus.getModificationTime());
            }
        } catch (FileNotFoundException e) {
            throw new PartitionNotFoundException(resultPartitionID);
        }
    }

    private YarnResultPartitionFileInfo searchResultPartitionDir(ResultPartitionID resultPartitionID) throws IOException {
        FileStatus fileStatus;
        FileStatus fileStatus2;
        for (Map.Entry<String, String> entry : this.appIdToUser.entrySet()) {
            String key = entry.getKey();
            String generatePartitionRootPath = ExternalBlockShuffleUtils.generatePartitionRootPath(generateRelativeLocalAppDir(entry.getValue(), key), resultPartitionID.getProducerId().toString(), resultPartitionID.getPartitionId().toString());
            for (String str : this.shuffleServiceConfiguration.getDirToDiskType().keySet()) {
                String str2 = str + generatePartitionRootPath;
                String generateFinishedPath = ExternalBlockShuffleUtils.generateFinishedPath(str2);
                try {
                    fileStatus = this.fileSystem.getFileStatus(new Path(str2));
                } catch (IOException e) {
                    fileStatus = null;
                }
                if (fileStatus != null && fileStatus.isDir()) {
                    try {
                        fileStatus2 = this.fileSystem.getFileStatus(new Path(generateFinishedPath));
                    } catch (FileNotFoundException e2) {
                        fileStatus2 = null;
                    } catch (IOException e3) {
                        throw e3;
                    }
                    if (fileStatus2 == null) {
                        YarnResultPartitionFileInfo putIfAbsent = this.resultPartitionMap.putIfAbsent(resultPartitionID, new YarnResultPartitionFileInfo(key, new Tuple2(str, str2), false, false, -1L, System.currentTimeMillis(), this.shuffleServiceConfiguration));
                        if (putIfAbsent == null || !putIfAbsent.isReadyToBeConsumed()) {
                            throw new PartitionNotFoundException(resultPartitionID);
                        }
                        putIfAbsent.updateOnConsumption();
                        return putIfAbsent;
                    }
                    YarnResultPartitionFileInfo yarnResultPartitionFileInfo = new YarnResultPartitionFileInfo(key, new Tuple2(str, str2), true, true, fileStatus2.getModificationTime(), System.currentTimeMillis(), this.shuffleServiceConfiguration);
                    YarnResultPartitionFileInfo putIfAbsent2 = this.resultPartitionMap.putIfAbsent(resultPartitionID, yarnResultPartitionFileInfo);
                    if (putIfAbsent2 != null) {
                        if (!putIfAbsent2.isReadyToBeConsumed()) {
                            putIfAbsent2.setReadyToBeConsumed(fileStatus2.getModificationTime());
                        }
                        putIfAbsent2.updateOnConsumption();
                        yarnResultPartitionFileInfo = putIfAbsent2;
                    }
                    return yarnResultPartitionFileInfo;
                }
            }
        }
        return null;
    }

    @VisibleForTesting
    void doDiskScan() {
        ResultPartitionID convertRelativeDirToResultPartitionID;
        long currentTimeMillis = System.currentTimeMillis();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Start to do disk scan, currTime: " + currentTimeMillis);
        }
        Set<String> keySet = this.shuffleServiceConfiguration.getDirToDiskType().keySet();
        for (Map.Entry<String, String> entry : this.appIdToUser.entrySet()) {
            String value = entry.getValue();
            String key = entry.getKey();
            String generateRelativeLocalAppDir = generateRelativeLocalAppDir(value, key);
            for (String str : keySet) {
                String str2 = str + generateRelativeLocalAppDir + ZKPaths.PATH_SEPARATOR;
                try {
                    FileStatus[] listStatus = this.fileSystem.listStatus(new Path(str2));
                    if (listStatus != null) {
                        for (FileStatus fileStatus : listStatus) {
                            if (fileStatus.isDir() && (convertRelativeDirToResultPartitionID = ExternalBlockShuffleUtils.convertRelativeDirToResultPartitionID(fileStatus.getPath().getName())) != null) {
                                updateResultPartitionFileInfoByFileStatus(currentTimeMillis, key, convertRelativeDirToResultPartitionID, fileStatus, str, str2);
                            }
                        }
                    }
                } catch (Exception e) {
                }
            }
        }
        Iterator<Map.Entry<ResultPartitionID, YarnResultPartitionFileInfo>> it = this.resultPartitionMap.entrySet().iterator();
        while (it.hasNext()) {
            YarnResultPartitionFileInfo value2 = it.next().getValue();
            boolean z = false;
            if (value2.needToDelete()) {
                z = true;
                removeResultPartition(new Path((String) value2.rootDirAndPartitionDir.f1), "FETCHED_PARTITION_TTL_TIMEOUT", -1L, false);
            } else if (value2.getFileInfoTimestamp() <= this.lastDiskScanTimestamp) {
                z = true;
            } else if (value2.isReadyToBeConsumed() && !value2.isConsumed()) {
                long partitionReadyTime = value2.getPartitionReadyTime();
                if (currentTimeMillis - partitionReadyTime > value2.getUnconsumedPartitionTTL()) {
                    z = true;
                    removeResultPartition(new Path((String) value2.rootDirAndPartitionDir.f1), "UNCONSUMED_PARTITION_TTL_TIMEOUT", partitionReadyTime, true);
                }
            }
            if (z) {
                it.remove();
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Finish disk scan, cost " + (System.currentTimeMillis() - currentTimeMillis) + " in ms.");
        }
        this.lastDiskScanTimestamp = currentTimeMillis;
    }

    private void updateResultPartitionFileInfoByFileStatus(long j, String str, ResultPartitionID resultPartitionID, FileStatus fileStatus, String str2, String str3) {
        FileStatus fileStatus2;
        YarnResultPartitionFileInfo yarnResultPartitionFileInfo = this.resultPartitionMap.get(resultPartitionID);
        if (yarnResultPartitionFileInfo != null) {
            yarnResultPartitionFileInfo.updateFileInfoTimestamp(j);
            if (!yarnResultPartitionFileInfo.configLoaded) {
                tryUpdateTTLByConfigFile(yarnResultPartitionFileInfo);
            }
            if (yarnResultPartitionFileInfo.isReadyToBeConsumed()) {
                return;
            }
        }
        String str4 = str3 + fileStatus.getPath().getName() + ZKPaths.PATH_SEPARATOR;
        try {
            fileStatus2 = this.fileSystem.getFileStatus(new Path(ExternalBlockShuffleUtils.generateFinishedPath(str4)));
        } catch (Exception e) {
            fileStatus2 = null;
        }
        if (fileStatus2 != null) {
            if (yarnResultPartitionFileInfo == null) {
                yarnResultPartitionFileInfo = new YarnResultPartitionFileInfo(str, new Tuple2(str2, str4), true, false, fileStatus2.getModificationTime(), j, this.shuffleServiceConfiguration);
                YarnResultPartitionFileInfo putIfAbsent = this.resultPartitionMap.putIfAbsent(resultPartitionID, yarnResultPartitionFileInfo);
                if (putIfAbsent != null) {
                    yarnResultPartitionFileInfo = putIfAbsent;
                }
            }
            if (!yarnResultPartitionFileInfo.isConfigLoaded()) {
                tryUpdateTTLByConfigFile(yarnResultPartitionFileInfo);
            }
            if (yarnResultPartitionFileInfo.isReadyToBeConsumed()) {
                return;
            }
            yarnResultPartitionFileInfo.setReadyToBeConsumed(fileStatus2.getModificationTime());
            return;
        }
        if (yarnResultPartitionFileInfo == null) {
            yarnResultPartitionFileInfo = new YarnResultPartitionFileInfo(str, new Tuple2(str2, str4), false, false, -1L, j, this.shuffleServiceConfiguration);
            YarnResultPartitionFileInfo putIfAbsent2 = this.resultPartitionMap.putIfAbsent(resultPartitionID, yarnResultPartitionFileInfo);
            if (putIfAbsent2 != null) {
                yarnResultPartitionFileInfo = putIfAbsent2;
            }
        }
        if (!yarnResultPartitionFileInfo.isConfigLoaded()) {
            tryUpdateTTLByConfigFile(yarnResultPartitionFileInfo);
        }
        if (yarnResultPartitionFileInfo.isReadyToBeConsumed()) {
            return;
        }
        long modificationTime = fileStatus.getModificationTime();
        if (j - modificationTime > yarnResultPartitionFileInfo.getUnfinishedPartitionTTL()) {
            removeResultPartition(fileStatus.getPath(), "UNFINISHED_PARTITION_TTL_TIMEOUT", modificationTime, true);
            this.resultPartitionMap.remove(resultPartitionID);
        }
    }

    private void tryUpdateTTLByConfigFile(YarnResultPartitionFileInfo yarnResultPartitionFileInfo) {
        FSDataInputStream fSDataInputStream = null;
        String str = null;
        try {
            str = ExternalBlockShuffleUtils.generateConfigPath((String) yarnResultPartitionFileInfo.getRootDirAndPartitionDir().f1);
            if (!this.fileSystem.exists(new Path(str))) {
                if (0 != 0) {
                    try {
                        fSDataInputStream.close();
                        return;
                    } catch (IOException e) {
                        LOG.warn("Exception throws when trying to close the config file " + str, e);
                        return;
                    }
                }
                return;
            }
            fSDataInputStream = this.fileSystem.open(new Path(str));
            if (fSDataInputStream != null) {
                DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
                yarnResultPartitionFileInfo.updateTTLConfig(dataInputViewStreamWrapper.readLong(), dataInputViewStreamWrapper.readLong(), dataInputViewStreamWrapper.readLong(), dataInputViewStreamWrapper.readLong());
            }
            if (fSDataInputStream != null) {
                try {
                    fSDataInputStream.close();
                } catch (IOException e2) {
                    LOG.warn("Exception throws when trying to close the config file " + str, e2);
                }
            }
        } catch (IOException e3) {
            if (fSDataInputStream != null) {
                try {
                    fSDataInputStream.close();
                } catch (IOException e4) {
                    LOG.warn("Exception throws when trying to close the config file " + str, e4);
                }
            }
        } catch (Throwable th) {
            if (fSDataInputStream != null) {
                try {
                    fSDataInputStream.close();
                } catch (IOException e5) {
                    LOG.warn("Exception throws when trying to close the config file " + str, e5);
                }
            }
            throw th;
        }
    }

    @VisibleForTesting
    static String parseContainerExecutorExecutablePath(ExternalBlockShuffleServiceConfiguration externalBlockShuffleServiceConfiguration, FileSystem fileSystem) {
        String string = externalBlockShuffleServiceConfiguration.getConfiguration().getString("yarn.nodemanager.linux-container-executor.path", new File(new File(System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key()), "bin"), "container-executor").getAbsolutePath());
        try {
            if (fileSystem.exists(new Path(string))) {
                LOG.info("Use container-executor to do recycling, file path: " + string);
                return string;
            }
        } catch (IOException e) {
        }
        throw new IllegalArgumentException("Invalid container-executor configuration: " + string);
    }

    @VisibleForTesting
    void removeResultPartition(Path path, String str, long j, boolean z) {
        try {
            String runAsUser = getRunAsUser(path.getParent().getParent().getParent().getName());
            Process exec = Runtime.getRuntime().exec(new String[]{this.containerExecutorExecutablePath, runAsUser, runAsUser, "3", path.toString()});
            if (exec.waitFor(30L, TimeUnit.SECONDS)) {
                int exitValue = exec.exitValue();
                if (exitValue != 0) {
                    LOG.warn("Fail to delete partition's directory: {}, user: {}, reason: {}, lastActiveTime: {}, exitCode: {}.", new Object[]{path, runAsUser, str, Long.valueOf(j), Integer.valueOf(exitValue)});
                } else if (z) {
                    LOG.info("Delete partition's directory: {}, user: {}, reason: {}, lastActiveTime: {}.", new Object[]{path, runAsUser, str, Long.valueOf(j)});
                }
            } else {
                LOG.warn("Delete partition's directory for more than 30 seconds: {}, user: {}, reason: {}, lastActiveTime: {}.", new Object[]{path, runAsUser, str, Long.valueOf(j)});
            }
        } catch (Exception e) {
            LOG.warn("Fail to delete partition's directory: {}, exception:", path, e);
        }
    }

    private String getRunAsUser(String str) {
        return (UserGroupInformation.isSecurityEnabled() || !this.containerLimitUsers.booleanValue()) ? str : this.nonsecureLocalUser;
    }
}
