/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.external;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleServiceOptions;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockSubpartitionViewScheduler;
import org.apache.flink.runtime.io.network.partition.external.OsCachePolicy;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalBlockShuffleServiceConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBlockShuffleServiceConfiguration.class);
    public static final String DEFAULT_DISK_TYPE = "HDD";
    private static final int MIN_BUFFER_NUMBER = 16;
    private static final Pattern DISK_TYPE_REGEX = Pattern.compile("^(\\[(\\w*)\\])?(.+)$");
    private final Configuration configuration;
    private final NettyConfig nettyConfig;
    private final FileSystem fileSystem;
    private final Map<String, String> dirToDiskType;
    private final Map<String, Integer> diskTypeToIOThreadNum;
    private final Integer bufferNumber;
    private final Integer memorySizePerBufferInBytes;
    private final Long waitCreditDelay;
    private final Long defaultConsumedPartitionTTL;
    private final Long defaultPartialConsumedPartitionTTL;
    private final Long defaultUnconsumedPartitionTTL;
    private final Long defaultUnfinishedPartitionTTL;
    private final Long diskScanIntervalInMS;
    private final Class<?> subpartitionViewSchedulerClass;
    private final OsCachePolicy osCachePolicy;
    private final Long maxReadAheadLengthInBytes;
    private final Long selfCheckIntervalInMS;
    private final Long memoryShrinkageIntervalInMS;
    private final Long objectMinIdleIntervalToShrinkInMS;
    private final Long heapMemoryThresholdInBytes;

    private ExternalBlockShuffleServiceConfiguration(Configuration configuration, NettyConfig nettyConfig, FileSystem fileSystem, Map<String, String> dirToDiskType, Map<String, Integer> diskTypeToIOThreadNum, Integer bufferNumber, Integer memorySizePerBufferInBytes, Long waitCreditDelay, Long defaultConsumedPartitionTTL, Long defaultPartialConsumedPartitionTTL, Long defaultUnconsumedPartitionTTL, Long defaultUnfinishedPartitionTTL, Long diskScanIntervalInMS, Class<?> subpartitionViewSchedulerClass, OsCachePolicy osCachePolicy, Long maxReadAheadLengthInBytes, Long selfCheckIntervalInMS, Long memoryShrinkageIntervalInMS, Long objectMinIdleIntervalToShrinkInMS, Long heapMemoryThresholdInBytes) {
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.nettyConfig = (NettyConfig)Preconditions.checkNotNull((Object)nettyConfig);
        this.fileSystem = (FileSystem)Preconditions.checkNotNull((Object)fileSystem);
        this.dirToDiskType = (Map)Preconditions.checkNotNull(dirToDiskType);
        this.diskTypeToIOThreadNum = (Map)Preconditions.checkNotNull(diskTypeToIOThreadNum);
        this.bufferNumber = bufferNumber;
        this.memorySizePerBufferInBytes = memorySizePerBufferInBytes;
        this.waitCreditDelay = waitCreditDelay;
        this.defaultConsumedPartitionTTL = defaultConsumedPartitionTTL;
        this.defaultPartialConsumedPartitionTTL = defaultPartialConsumedPartitionTTL;
        this.defaultUnconsumedPartitionTTL = defaultUnconsumedPartitionTTL;
        this.defaultUnfinishedPartitionTTL = defaultUnfinishedPartitionTTL;
        this.diskScanIntervalInMS = diskScanIntervalInMS;
        this.subpartitionViewSchedulerClass = (Class)Preconditions.checkNotNull(subpartitionViewSchedulerClass);
        this.osCachePolicy = osCachePolicy;
        this.maxReadAheadLengthInBytes = maxReadAheadLengthInBytes;
        this.selfCheckIntervalInMS = selfCheckIntervalInMS;
        this.memoryShrinkageIntervalInMS = memoryShrinkageIntervalInMS;
        this.objectMinIdleIntervalToShrinkInMS = objectMinIdleIntervalToShrinkInMS;
        this.heapMemoryThresholdInBytes = heapMemoryThresholdInBytes;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public NettyConfig getNettyConfig() {
        return this.nettyConfig;
    }

    FileSystem getFileSystem() {
        return this.fileSystem;
    }

    Map<String, String> getDirToDiskType() {
        return Collections.unmodifiableMap(this.dirToDiskType);
    }

    Map<String, Integer> getDiskTypeToIOThreadNum() {
        return Collections.unmodifiableMap(this.diskTypeToIOThreadNum);
    }

    Integer getTotalIOThreadNum() {
        return this.dirToDiskType.entrySet().stream().mapToInt(entry -> this.diskTypeToIOThreadNum.get(entry.getValue())).sum();
    }

    Integer getBufferNumber() {
        return this.bufferNumber;
    }

    Integer getMemorySizePerBufferInBytes() {
        return this.memorySizePerBufferInBytes;
    }

    Long getWaitCreditDelay() {
        return this.waitCreditDelay;
    }

    Long getDefaultConsumedPartitionTTL() {
        return this.defaultConsumedPartitionTTL;
    }

    Long getDefaultPartialConsumedPartitionTTL() {
        return this.defaultPartialConsumedPartitionTTL;
    }

    Long getDefaultUnconsumedPartitionTTL() {
        return this.defaultUnconsumedPartitionTTL;
    }

    Long getDefaultUnfinishedPartitionTTL() {
        return this.defaultUnfinishedPartitionTTL;
    }

    Long getDiskScanIntervalInMS() {
        return this.diskScanIntervalInMS;
    }

    ExternalBlockSubpartitionViewScheduler newSubpartitionViewScheduler() {
        try {
            return (ExternalBlockSubpartitionViewScheduler)this.subpartitionViewSchedulerClass.newInstance();
        }
        catch (Exception e) {
            LOG.warn("Failed to new ExternalSubpartitionViewScheduler " + this.subpartitionViewSchedulerClass + ", exception:", (Throwable)e);
            return null;
        }
    }

    OsCachePolicy getOsCachePolicy() {
        return this.osCachePolicy;
    }

    Long getMaxReadAheadLengthInBytes() {
        return this.maxReadAheadLengthInBytes;
    }

    Long getSelfCheckIntervalInMS() {
        return this.selfCheckIntervalInMS;
    }

    Long getMemoryShrinkageIntervalInMS() {
        return this.memoryShrinkageIntervalInMS;
    }

    Long getObjectMinIdleIntervalToShrinkInMS() {
        return this.objectMinIdleIntervalToShrinkInMS;
    }

    Long getHeapMemoryThresholdInBytes() {
        return this.heapMemoryThresholdInBytes;
    }

    private static NettyConfig createNettyConfig(Configuration configuration) {
        Integer port = configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_PORT_KEY);
        Preconditions.checkArgument((port != null && port > 0 && port < 65536 ? 1 : 0) != 0, (Object)("Invalid port number for ExternalBlockShuffleService: " + port));
        InetSocketAddress shuffleServiceInetSocketAddress = new InetSocketAddress(port);
        int memorySizePerBufferInBytes = configuration.getInteger(ExternalBlockShuffleServiceOptions.MEMORY_SIZE_PER_BUFFER_IN_BYTES);
        return new NettyConfig(shuffleServiceInetSocketAddress.getAddress(), shuffleServiceInetSocketAddress.getPort(), memorySizePerBufferInBytes, Integer.MAX_VALUE, configuration);
    }

    static ExternalBlockShuffleServiceConfiguration fromConfiguration(Configuration configuration) throws Exception {
        Map<String, String> dirToDiskType = ExternalBlockShuffleServiceConfiguration.parseDirToDiskType(configuration);
        Map<String, Integer> diskTypeToIOThreadNum = ExternalBlockShuffleServiceConfiguration.parseDiskTypeToIOThreadNum(configuration);
        ExternalBlockShuffleServiceConfiguration.validateDiskTypeConfiguration(dirToDiskType, diskTypeToIOThreadNum);
        int diskIOThreadNum = dirToDiskType.entrySet().stream().mapToInt(entry -> (Integer)diskTypeToIOThreadNum.get(entry.getValue())).sum();
        Preconditions.checkArgument((diskIOThreadNum > 0 ? 1 : 0) != 0, (Object)("DiskIOThreadNum should be greater than 0, actual value: " + diskIOThreadNum));
        int nettyThreadNum = configuration.getInteger(ExternalBlockShuffleServiceOptions.SERVER_THREAD_NUM);
        if (nettyThreadNum <= 0) {
            nettyThreadNum = diskIOThreadNum;
        }
        configuration.setInteger(NettyConfig.NUM_THREADS_SERVER.key(), nettyThreadNum);
        long directMemoryLimitInBytes = (long)configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_DIRECT_MEMORY_LIMIT_IN_MB) << 20;
        long nettyMemorySizeInBytes = (long)configuration.getInteger(ExternalBlockShuffleServiceOptions.NETTY_MEMORY_IN_MB) << 20;
        NettyConfig nettyConfigWithoutTotalMemory = ExternalBlockShuffleServiceConfiguration.createNettyConfig(configuration);
        long maxNettyMemorySizeInBytes = (long)(nettyConfigWithoutTotalMemory.getServerNumThreads() + 1) * (long)nettyConfigWithoutTotalMemory.getChunkSize();
        nettyMemorySizeInBytes = nettyMemorySizeInBytes > 0L ? Math.min(nettyMemorySizeInBytes, maxNettyMemorySizeInBytes) : Math.min(directMemoryLimitInBytes / 2L, maxNettyMemorySizeInBytes);
        Preconditions.checkArgument((nettyMemorySizeInBytes < directMemoryLimitInBytes ? 1 : 0) != 0, (Object)("The configured Netty memory size is less than the total direct memory size, netty size is " + (nettyMemorySizeInBytes >> 20) + "MB, total direct memory size is " + (directMemoryLimitInBytes >> 20) + "MB"));
        int nettyDirectMemorySizeInMB = (int)(nettyMemorySizeInBytes >> 20);
        configuration.setInteger(TaskManagerOptions.TASK_MANAGER_PROCESS_NETTY_MEMORY, nettyDirectMemorySizeInMB);
        NettyConfig nettyConfig = ExternalBlockShuffleServiceConfiguration.createNettyConfig(configuration);
        Preconditions.checkArgument((nettyConfig.getNumberOfArenas() >= 1 ? 1 : 0) != 0, (Object)("Direct memory left for netty (" + nettyDirectMemorySizeInMB + "MB) is not enough at least one arena, please increase the total direct memory size or both the total direct memory sizeand netty memory size if netty memory size is configured explicitly."));
        int memorySizePerBufferInBytes = configuration.getInteger(ExternalBlockShuffleServiceOptions.MEMORY_SIZE_PER_BUFFER_IN_BYTES);
        int bufferNum = (int)((directMemoryLimitInBytes - nettyMemorySizeInBytes) / (long)memorySizePerBufferInBytes);
        Preconditions.checkArgument((bufferNum >= 16 ? 1 : 0) != 0, (Object)"Direct memory left for the send buffer pool is less than the minimal value (16), please increase the total direct memory size or decrease the netty memory size.");
        long defaultConsumedPartitionTTL = configuration.getInteger(ExternalBlockShuffleServiceOptions.CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        long defaultPartialConsumedPartitionTTL = configuration.getInteger(ExternalBlockShuffleServiceOptions.PARTIAL_CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        long defaultUnconsumedPartitionTTL = configuration.getInteger(ExternalBlockShuffleServiceOptions.UNCONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        long defaultUnfinishedPartitionTTL = configuration.getInteger(ExternalBlockShuffleServiceOptions.UNFINISHED_PARTITION_TTL_IN_SECONDS) * 1000;
        Preconditions.checkArgument((defaultConsumedPartitionTTL <= defaultPartialConsumedPartitionTTL ? 1 : 0) != 0, (Object)("ConsumedPartitionTTL should be less than PartialConsumedPartitionTTL, ConsumedPartitionTTL: " + defaultConsumedPartitionTTL + " ms, PartialConsumedPartitionTTL: " + defaultPartialConsumedPartitionTTL + " ms."));
        Long diskScanIntervalInMS = Math.min(Math.min(Math.min(defaultConsumedPartitionTTL, defaultPartialConsumedPartitionTTL), Math.min(defaultUnconsumedPartitionTTL, defaultUnfinishedPartitionTTL)), configuration.getLong(ExternalBlockShuffleServiceOptions.DISK_SCAN_INTERVAL_IN_MS));
        Class<?> subpartitionViewSchedulerClass = null;
        String schedulerName = configuration.getString(ExternalBlockShuffleServiceOptions.SUBPARTITION_VIEW_SCHEDULER_CLASS).trim();
        if (schedulerName.isEmpty()) {
            schedulerName = (String)ExternalBlockShuffleServiceOptions.SUBPARTITION_VIEW_SCHEDULER_CLASS.defaultValue();
        }
        try {
            subpartitionViewSchedulerClass = Class.forName(schedulerName);
            ExternalBlockSubpartitionViewScheduler externalBlockSubpartitionViewScheduler = (ExternalBlockSubpartitionViewScheduler)subpartitionViewSchedulerClass.newInstance();
        }
        catch (Exception e) {
            LOG.error("Failed to new ExternalBlockSubpartitionViewScheduler " + schedulerName + ", exception: ", (Throwable)e);
            throw e;
        }
        long waitCreditDelay = configuration.getLong(ExternalBlockShuffleServiceOptions.WAIT_CREDIT_DELAY_IN_MS);
        OsCachePolicy osCachePolicy = OsCachePolicy.getOsCachePolicyFromConfiguration(configuration, LOG);
        Long maxReadAheadLengthInBytes = configuration.getLong(ExternalBlockShuffleServiceOptions.MAX_READ_AHEAD_LENGTH_IN_BYTES);
        Long selfCheckIntervalInMS = configuration.getLong(ExternalBlockShuffleServiceOptions.SELF_CHECK_INTERVAL_IN_MS);
        Long memoryShrinkageIntervalInMS = configuration.getLong(ExternalBlockShuffleServiceOptions.MEMORY_SHRINKAGE_INTERVAL_IN_MS);
        if (memoryShrinkageIntervalInMS < selfCheckIntervalInMS) {
            LOG.warn("memoryShrinkageIntervalInMS: " + memoryShrinkageIntervalInMS + " should be no less than selfCheckIntervalInMS: " + selfCheckIntervalInMS + ", use selfCheckIntervalInMS instead.");
            memoryShrinkageIntervalInMS = selfCheckIntervalInMS;
        }
        Long objectMinIdleIntervalToShrinkInMS = configuration.getLong(ExternalBlockShuffleServiceOptions.OBJECT_MIN_IDLE_INTERVAL_TO_SHRINK_IN_MS);
        Long heapMemoryLimitInBytes = (long)configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_HEAP_MEMORY_LIMIT_IN_MB) << 20;
        Integer heapMemoryToStartShrinkingInPercentage = configuration.getInteger(ExternalBlockShuffleServiceOptions.HEAP_MEMORY_THRESHOLD_TO_START_SHRINKING_IN_PERCENTAGE);
        if (heapMemoryToStartShrinkingInPercentage > 100) {
            LOG.warn("heapMemoryToStartShrinkingInPercentage: " + heapMemoryToStartShrinkingInPercentage + " should be no more than 100, use 100% instead.");
            heapMemoryToStartShrinkingInPercentage = 100;
        }
        Long heapMemoryThresholdInBytes = heapMemoryLimitInBytes * (long)heapMemoryToStartShrinkingInPercentage.intValue() / 100L;
        return new ExternalBlockShuffleServiceConfiguration(configuration, nettyConfig, FileSystem.getLocalFileSystem(), dirToDiskType, diskTypeToIOThreadNum, bufferNum, memorySizePerBufferInBytes, waitCreditDelay, defaultConsumedPartitionTTL, defaultPartialConsumedPartitionTTL, defaultUnconsumedPartitionTTL, defaultUnfinishedPartitionTTL, diskScanIntervalInMS, subpartitionViewSchedulerClass, osCachePolicy, maxReadAheadLengthInBytes, selfCheckIntervalInMS, memoryShrinkageIntervalInMS, objectMinIdleIntervalToShrinkInMS, heapMemoryThresholdInBytes);
    }

    public String toString() {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("Configurations for ExternalBlockShuffleService: { ShuffleServicePort: ").append(this.configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_PORT_KEY)).append(", BufferNumber: ").append(this.bufferNumber).append(", ").append("MemorySizePerBufferInBytes: ").append(this.memorySizePerBufferInBytes).append(", ").append("NettyThreadNum: ").append(this.configuration.getInteger(NettyConfig.NUM_THREADS_SERVER)).append(", ").append("NettyArenasNum: ").append(this.configuration.getInteger(NettyConfig.NUM_ARENAS)).append(", ").append("WaitCreditDelay: ").append(this.waitCreditDelay).append(", ").append("ConsumedPartitionTTL: ").append(this.defaultConsumedPartitionTTL).append(", ").append("PartialConsumedPartitionTTL: ").append(this.defaultPartialConsumedPartitionTTL).append(", ").append("UnconsumedPartitionTTL: ").append(this.defaultUnconsumedPartitionTTL).append(", ").append("UnfinishedPartitionTTL: ").append(this.defaultUnfinishedPartitionTTL).append(", ").append("DiskScanIntervalInMS: ").append(this.diskScanIntervalInMS).append(",").append("OsCachePolicy: ").append((Object)this.osCachePolicy).append(", ").append("MaxReadAheadLengthInBytes: ").append(this.maxReadAheadLengthInBytes).append(", ").append("SelfCheckIntervalInMS: ").append(this.selfCheckIntervalInMS).append(", ").append("MemoryShrinkageIntervalInMS: ").append(this.memoryShrinkageIntervalInMS).append(", ").append("HeapMemoryThresholdInBytes: ").append(this.heapMemoryThresholdInBytes).append(", ").append("SubpartitionViewSchedulerClass: ").append(this.subpartitionViewSchedulerClass.getCanonicalName()).append(", ");
        this.dirToDiskType.forEach((dir, diskType) -> stringBuilder.append("[").append((String)diskType).append("]").append((String)dir).append(": ").append(this.diskTypeToIOThreadNum.get(diskType)).append(", "));
        stringBuilder.append("}");
        return stringBuilder.toString();
    }

    @VisibleForTesting
    protected static Map<String, Integer> parseDiskTypeToIOThreadNum(Configuration configuration) {
        HashMap<String, Integer> diskTypeToIOThread = new HashMap<String, Integer>();
        Integer defaultIOThreadNum = configuration.getInteger(ExternalBlockShuffleServiceOptions.DEFAULT_IO_THREAD_NUM_PER_DISK);
        diskTypeToIOThread.put(DEFAULT_DISK_TYPE, defaultIOThreadNum);
        String strConfig = configuration.getString(ExternalBlockShuffleServiceOptions.IO_THREAD_NUM_FOR_DISK_TYPE);
        String[] diskConfigList = strConfig.split(",");
        if (diskConfigList != null && diskConfigList.length > 0) {
            for (String strDiskConfig : diskConfigList) {
                String[] kv;
                if (strDiskConfig == null || strDiskConfig.isEmpty() || (kv = strDiskConfig.split(":")) == null || kv.length != 2) continue;
                diskTypeToIOThread.put(kv[0].trim(), Integer.valueOf(kv[1].trim()));
            }
        }
        return diskTypeToIOThread;
    }

    @VisibleForTesting
    protected static Map<String, String> parseDirToDiskType(Configuration configuration) {
        String strConfig = configuration.getString(ExternalBlockShuffleServiceOptions.LOCAL_DIRS);
        return ExternalBlockShuffleServiceConfiguration.parseDirToDiskType(strConfig);
    }

    public static Map<String, String> parseDirToDiskType(String strConfig) {
        HashMap<String, String> dirToDiskType = new HashMap<String, String>();
        List<String> nonEmptyDirConfigs = ExternalBlockShuffleServiceConfiguration.splitDiskConfigList(strConfig);
        for (String strDirConfig : nonEmptyDirConfigs) {
            Matcher matcher = DISK_TYPE_REGEX.matcher(strDirConfig);
            if (!matcher.matches()) continue;
            String diskType = matcher.group(2);
            String dir = matcher.group(3);
            dir = dir != null ? dir.trim() : null;
            if (dir == null || dir.isEmpty()) continue;
            dir = !dir.endsWith("/") ? dir.concat("/") : dir;
            dirToDiskType.put(dir, diskType != null && !diskType.isEmpty() ? diskType.trim() : DEFAULT_DISK_TYPE);
        }
        return dirToDiskType;
    }

    public static List<String> splitDiskConfigList(String strConfig) {
        String[] dirConfigList;
        ArrayList<String> nonEmptyDirConfigs = new ArrayList<String>();
        for (String strDirConfig : dirConfigList = strConfig.split(",")) {
            if ((strDirConfig = strDirConfig.trim()).isEmpty()) continue;
            nonEmptyDirConfigs.add(strDirConfig);
        }
        return nonEmptyDirConfigs;
    }

    private static void validateDiskTypeConfiguration(Map<String, String> dirToDiskType, Map<String, Integer> diskTypeToIOThreadNum) throws Exception {
        Set<String> diskTypes = diskTypeToIOThreadNum.keySet();
        boolean success = dirToDiskType.entrySet().stream().noneMatch(dirEntry -> {
            boolean ifContains = diskTypes.contains(dirEntry.getValue());
            if (!ifContains) {
                LOG.error("Invalid configuration: Require IO thread num for dir [{0}] with disk type [{1}].", dirEntry.getKey(), dirEntry.getValue());
            }
            return !ifContains;
        });
        Preconditions.checkArgument((boolean)success, (Object)("Invalid disk configuration for ExternalBlockShuffleService, " + ExternalBlockShuffleServiceOptions.IO_THREAD_NUM_FOR_DISK_TYPE.key() + " : " + diskTypeToIOThreadNum + ", " + ExternalBlockShuffleServiceOptions.LOCAL_DIRS + " : " + dirToDiskType));
    }
}

