package org.apache.flink.runtime.io.network.partition.external;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/external/ExternalBlockShuffleServiceConfiguration.class */
public class ExternalBlockShuffleServiceConfiguration {
    public static final String DEFAULT_DISK_TYPE = "HDD";
    private static final int MIN_BUFFER_NUMBER = 16;
    private final Configuration configuration;
    private final NettyConfig nettyConfig;
    private final FileSystem fileSystem;
    private final Map<String, String> dirToDiskType;
    private final Map<String, Integer> diskTypeToIOThreadNum;
    private final Integer bufferNumber;
    private final Integer memorySizePerBufferInBytes;
    private final Long waitCreditDelay;
    private final Long defaultConsumedPartitionTTL;
    private final Long defaultPartialConsumedPartitionTTL;
    private final Long defaultUnconsumedPartitionTTL;
    private final Long defaultUnfinishedPartitionTTL;
    private final Long diskScanIntervalInMS;
    private final Class<?> subpartitionViewSchedulerClass;
    private final OsCachePolicy osCachePolicy;
    private final Long maxReadAheadLengthInBytes;
    private final Long selfCheckIntervalInMS;
    private final Long memoryShrinkageIntervalInMS;
    private final Long objectMinIdleIntervalToShrinkInMS;
    private final Long heapMemoryThresholdInBytes;
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBlockShuffleServiceConfiguration.class);
    private static final Pattern DISK_TYPE_REGEX = Pattern.compile("^(\\[(\\w*)\\])?(.+)$");

    private ExternalBlockShuffleServiceConfiguration(Configuration configuration, NettyConfig nettyConfig, FileSystem fileSystem, Map<String, String> map, Map<String, Integer> map2, Integer num, Integer num2, Long l, Long l2, Long l3, Long l4, Long l5, Long l6, Class<?> cls, OsCachePolicy osCachePolicy, Long l7, Long l8, Long l9, Long l10, Long l11) {
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration);
        this.nettyConfig = (NettyConfig) Preconditions.checkNotNull(nettyConfig);
        this.fileSystem = (FileSystem) Preconditions.checkNotNull(fileSystem);
        this.dirToDiskType = (Map) Preconditions.checkNotNull(map);
        this.diskTypeToIOThreadNum = (Map) Preconditions.checkNotNull(map2);
        this.bufferNumber = num;
        this.memorySizePerBufferInBytes = num2;
        this.waitCreditDelay = l;
        this.defaultConsumedPartitionTTL = l2;
        this.defaultPartialConsumedPartitionTTL = l3;
        this.defaultUnconsumedPartitionTTL = l4;
        this.defaultUnfinishedPartitionTTL = l5;
        this.diskScanIntervalInMS = l6;
        this.subpartitionViewSchedulerClass = (Class) Preconditions.checkNotNull(cls);
        this.osCachePolicy = osCachePolicy;
        this.maxReadAheadLengthInBytes = l7;
        this.selfCheckIntervalInMS = l8;
        this.memoryShrinkageIntervalInMS = l9;
        this.objectMinIdleIntervalToShrinkInMS = l10;
        this.heapMemoryThresholdInBytes = l11;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public NettyConfig getNettyConfig() {
        return this.nettyConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileSystem getFileSystem() {
        return this.fileSystem;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, String> getDirToDiskType() {
        return Collections.unmodifiableMap(this.dirToDiskType);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Integer> getDiskTypeToIOThreadNum() {
        return Collections.unmodifiableMap(this.diskTypeToIOThreadNum);
    }

    Integer getTotalIOThreadNum() {
        return Integer.valueOf(this.dirToDiskType.entrySet().stream().mapToInt(entry -> {
            return this.diskTypeToIOThreadNum.get(entry.getValue()).intValue();
        }).sum());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Integer getBufferNumber() {
        return this.bufferNumber;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Integer getMemorySizePerBufferInBytes() {
        return this.memorySizePerBufferInBytes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getWaitCreditDelay() {
        return this.waitCreditDelay;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getDefaultConsumedPartitionTTL() {
        return this.defaultConsumedPartitionTTL;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getDefaultPartialConsumedPartitionTTL() {
        return this.defaultPartialConsumedPartitionTTL;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getDefaultUnconsumedPartitionTTL() {
        return this.defaultUnconsumedPartitionTTL;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getDefaultUnfinishedPartitionTTL() {
        return this.defaultUnfinishedPartitionTTL;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getDiskScanIntervalInMS() {
        return this.diskScanIntervalInMS;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExternalBlockSubpartitionViewScheduler newSubpartitionViewScheduler() {
        try {
            return (ExternalBlockSubpartitionViewScheduler) this.subpartitionViewSchedulerClass.newInstance();
        } catch (Exception e) {
            LOG.warn("Failed to new ExternalSubpartitionViewScheduler " + this.subpartitionViewSchedulerClass + ", exception:", e);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OsCachePolicy getOsCachePolicy() {
        return this.osCachePolicy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getMaxReadAheadLengthInBytes() {
        return this.maxReadAheadLengthInBytes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getSelfCheckIntervalInMS() {
        return this.selfCheckIntervalInMS;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getMemoryShrinkageIntervalInMS() {
        return this.memoryShrinkageIntervalInMS;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getObjectMinIdleIntervalToShrinkInMS() {
        return this.objectMinIdleIntervalToShrinkInMS;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getHeapMemoryThresholdInBytes() {
        return this.heapMemoryThresholdInBytes;
    }

    private static NettyConfig createNettyConfig(Configuration configuration) {
        Integer valueOf = Integer.valueOf(configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_PORT_KEY));
        Preconditions.checkArgument(valueOf != null && valueOf.intValue() > 0 && valueOf.intValue() < 65536, "Invalid port number for ExternalBlockShuffleService: " + valueOf);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(valueOf.intValue());
        return new NettyConfig(inetSocketAddress.getAddress(), inetSocketAddress.getPort(), configuration.getInteger(ExternalBlockShuffleServiceOptions.MEMORY_SIZE_PER_BUFFER_IN_BYTES), Integer.MAX_VALUE, configuration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ExternalBlockShuffleServiceConfiguration fromConfiguration(Configuration configuration) throws Exception {
        Map<String, String> parseDirToDiskType = parseDirToDiskType(configuration);
        Map<String, Integer> parseDiskTypeToIOThreadNum = parseDiskTypeToIOThreadNum(configuration);
        validateDiskTypeConfiguration(parseDirToDiskType, parseDiskTypeToIOThreadNum);
        int sum = parseDirToDiskType.entrySet().stream().mapToInt(entry -> {
            return ((Integer) parseDiskTypeToIOThreadNum.get(entry.getValue())).intValue();
        }).sum();
        Preconditions.checkArgument(sum > 0, "DiskIOThreadNum should be greater than 0, actual value: " + sum);
        int integer = configuration.getInteger(ExternalBlockShuffleServiceOptions.SERVER_THREAD_NUM);
        if (integer <= 0) {
            integer = sum;
        }
        configuration.setInteger(NettyConfig.NUM_THREADS_SERVER.key(), integer);
        long integer2 = configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_DIRECT_MEMORY_LIMIT_IN_MB) << 20;
        long integer3 = configuration.getInteger(ExternalBlockShuffleServiceOptions.NETTY_MEMORY_IN_MB) << 20;
        NettyConfig createNettyConfig = createNettyConfig(configuration);
        long serverNumThreads = (createNettyConfig.getServerNumThreads() + 1) * createNettyConfig.getChunkSize();
        long min = integer3 > 0 ? Math.min(integer3, serverNumThreads) : Math.min(integer2 / 2, serverNumThreads);
        Preconditions.checkArgument(min < integer2, "The configured Netty memory size is less than the total direct memory size, netty size is " + (min >> 20) + "MB, total direct memory size is " + (integer2 >> 20) + "MB");
        int i = (int) (min >> 20);
        configuration.setInteger(TaskManagerOptions.TASK_MANAGER_PROCESS_NETTY_MEMORY, i);
        NettyConfig createNettyConfig2 = createNettyConfig(configuration);
        Preconditions.checkArgument(createNettyConfig2.getNumberOfArenas() >= 1, "Direct memory left for netty (" + i + "MB) is not enough at least one arena, please increase the total direct memory size or both the total direct memory sizeand netty memory size if netty memory size is configured explicitly.");
        int integer4 = configuration.getInteger(ExternalBlockShuffleServiceOptions.MEMORY_SIZE_PER_BUFFER_IN_BYTES);
        int i2 = (int) ((integer2 - min) / integer4);
        Preconditions.checkArgument(i2 >= 16, "Direct memory left for the send buffer pool is less than the minimal value (16), please increase the total direct memory size or decrease the netty memory size.");
        long integer5 = configuration.getInteger(ExternalBlockShuffleServiceOptions.CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        long integer6 = configuration.getInteger(ExternalBlockShuffleServiceOptions.PARTIAL_CONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        long integer7 = configuration.getInteger(ExternalBlockShuffleServiceOptions.UNCONSUMED_PARTITION_TTL_IN_SECONDS) * 1000;
        long integer8 = configuration.getInteger(ExternalBlockShuffleServiceOptions.UNFINISHED_PARTITION_TTL_IN_SECONDS) * 1000;
        Preconditions.checkArgument(integer5 <= integer6, "ConsumedPartitionTTL should be less than PartialConsumedPartitionTTL, ConsumedPartitionTTL: " + integer5 + " ms, PartialConsumedPartitionTTL: " + integer6 + " ms.");
        Long valueOf = Long.valueOf(Math.min(Math.min(Math.min(integer5, integer6), Math.min(integer7, integer8)), configuration.getLong(ExternalBlockShuffleServiceOptions.DISK_SCAN_INTERVAL_IN_MS)));
        String trim = configuration.getString(ExternalBlockShuffleServiceOptions.SUBPARTITION_VIEW_SCHEDULER_CLASS).trim();
        if (trim.isEmpty()) {
            trim = (String) ExternalBlockShuffleServiceOptions.SUBPARTITION_VIEW_SCHEDULER_CLASS.defaultValue();
        }
        try {
            Class<?> cls = Class.forName(trim);
            long j = configuration.getLong(ExternalBlockShuffleServiceOptions.WAIT_CREDIT_DELAY_IN_MS);
            OsCachePolicy osCachePolicyFromConfiguration = OsCachePolicy.getOsCachePolicyFromConfiguration(configuration, LOG);
            Long valueOf2 = Long.valueOf(configuration.getLong(ExternalBlockShuffleServiceOptions.MAX_READ_AHEAD_LENGTH_IN_BYTES));
            Long valueOf3 = Long.valueOf(configuration.getLong(ExternalBlockShuffleServiceOptions.SELF_CHECK_INTERVAL_IN_MS));
            Long valueOf4 = Long.valueOf(configuration.getLong(ExternalBlockShuffleServiceOptions.MEMORY_SHRINKAGE_INTERVAL_IN_MS));
            if (valueOf4.longValue() < valueOf3.longValue()) {
                LOG.warn("memoryShrinkageIntervalInMS: " + valueOf4 + " should be no less than selfCheckIntervalInMS: " + valueOf3 + ", use selfCheckIntervalInMS instead.");
                valueOf4 = valueOf3;
            }
            Long valueOf5 = Long.valueOf(configuration.getLong(ExternalBlockShuffleServiceOptions.OBJECT_MIN_IDLE_INTERVAL_TO_SHRINK_IN_MS));
            Long valueOf6 = Long.valueOf(configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_HEAP_MEMORY_LIMIT_IN_MB) << 20);
            Integer valueOf7 = Integer.valueOf(configuration.getInteger(ExternalBlockShuffleServiceOptions.HEAP_MEMORY_THRESHOLD_TO_START_SHRINKING_IN_PERCENTAGE));
            if (valueOf7.intValue() > 100) {
                LOG.warn("heapMemoryToStartShrinkingInPercentage: " + valueOf7 + " should be no more than 100, use 100% instead.");
                valueOf7 = 100;
            }
            return new ExternalBlockShuffleServiceConfiguration(configuration, createNettyConfig2, FileSystem.getLocalFileSystem(), parseDirToDiskType, parseDiskTypeToIOThreadNum, Integer.valueOf(i2), Integer.valueOf(integer4), Long.valueOf(j), Long.valueOf(integer5), Long.valueOf(integer6), Long.valueOf(integer7), Long.valueOf(integer8), valueOf, cls, osCachePolicyFromConfiguration, valueOf2, valueOf3, valueOf4, valueOf5, Long.valueOf((valueOf6.longValue() * valueOf7.intValue()) / 100));
        } catch (Exception e) {
            LOG.error("Failed to new ExternalBlockSubpartitionViewScheduler " + trim + ", exception: ", e);
            throw e;
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("Configurations for ExternalBlockShuffleService: { ShuffleServicePort: ").append(this.configuration.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_PORT_KEY)).append(", BufferNumber: ").append(this.bufferNumber).append(", ").append("MemorySizePerBufferInBytes: ").append(this.memorySizePerBufferInBytes).append(", ").append("NettyThreadNum: ").append(this.configuration.getInteger(NettyConfig.NUM_THREADS_SERVER)).append(", ").append("NettyArenasNum: ").append(this.configuration.getInteger(NettyConfig.NUM_ARENAS)).append(", ").append("WaitCreditDelay: ").append(this.waitCreditDelay).append(", ").append("ConsumedPartitionTTL: ").append(this.defaultConsumedPartitionTTL).append(", ").append("PartialConsumedPartitionTTL: ").append(this.defaultPartialConsumedPartitionTTL).append(", ").append("UnconsumedPartitionTTL: ").append(this.defaultUnconsumedPartitionTTL).append(", ").append("UnfinishedPartitionTTL: ").append(this.defaultUnfinishedPartitionTTL).append(", ").append("DiskScanIntervalInMS: ").append(this.diskScanIntervalInMS).append(",").append("OsCachePolicy: ").append(this.osCachePolicy).append(", ").append("MaxReadAheadLengthInBytes: ").append(this.maxReadAheadLengthInBytes).append(", ").append("SelfCheckIntervalInMS: ").append(this.selfCheckIntervalInMS).append(", ").append("MemoryShrinkageIntervalInMS: ").append(this.memoryShrinkageIntervalInMS).append(", ").append("HeapMemoryThresholdInBytes: ").append(this.heapMemoryThresholdInBytes).append(", ").append("SubpartitionViewSchedulerClass: ").append(this.subpartitionViewSchedulerClass.getCanonicalName()).append(", ");
        this.dirToDiskType.forEach((str, str2) -> {
            sb.append("[").append(str2).append("]").append(str).append(": ").append(this.diskTypeToIOThreadNum.get(str2)).append(", ");
        });
        sb.append("}");
        return sb.toString();
    }

    @VisibleForTesting
    protected static Map<String, Integer> parseDiskTypeToIOThreadNum(Configuration configuration) {
        String[] split;
        HashMap hashMap = new HashMap();
        hashMap.put(DEFAULT_DISK_TYPE, Integer.valueOf(configuration.getInteger(ExternalBlockShuffleServiceOptions.DEFAULT_IO_THREAD_NUM_PER_DISK)));
        String[] split2 = configuration.getString(ExternalBlockShuffleServiceOptions.IO_THREAD_NUM_FOR_DISK_TYPE).split(",");
        if (split2 != null && split2.length > 0) {
            for (String str : split2) {
                if (str != null && !str.isEmpty() && (split = str.split(":")) != null && split.length == 2) {
                    hashMap.put(split[0].trim(), Integer.valueOf(split[1].trim()));
                }
            }
        }
        return hashMap;
    }

    @VisibleForTesting
    protected static Map<String, String> parseDirToDiskType(Configuration configuration) {
        return parseDirToDiskType(configuration.getString(ExternalBlockShuffleServiceOptions.LOCAL_DIRS));
    }

    public static Map<String, String> parseDirToDiskType(String str) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = splitDiskConfigList(str).iterator();
        while (it.hasNext()) {
            Matcher matcher = DISK_TYPE_REGEX.matcher(it.next());
            if (matcher.matches()) {
                String group = matcher.group(2);
                String group2 = matcher.group(3);
                String trim = group2 != null ? group2.trim() : null;
                if (trim != null && !trim.isEmpty()) {
                    hashMap.put(!trim.endsWith(ZKPaths.PATH_SEPARATOR) ? trim.concat(ZKPaths.PATH_SEPARATOR) : trim, (group == null || group.isEmpty()) ? DEFAULT_DISK_TYPE : group.trim());
                }
            }
        }
        return hashMap;
    }

    public static List<String> splitDiskConfigList(String str) {
        ArrayList arrayList = new ArrayList();
        for (String str2 : str.split(",")) {
            String trim = str2.trim();
            if (!trim.isEmpty()) {
                arrayList.add(trim);
            }
        }
        return arrayList;
    }

    private static void validateDiskTypeConfiguration(Map<String, String> map, Map<String, Integer> map2) throws Exception {
        Set<String> keySet = map2.keySet();
        Preconditions.checkArgument(map.entrySet().stream().noneMatch(entry -> {
            boolean contains = keySet.contains(entry.getValue());
            if (!contains) {
                LOG.error("Invalid configuration: Require IO thread num for dir [{0}] with disk type [{1}].", entry.getKey(), entry.getValue());
            }
            return !contains;
        }), "Invalid disk configuration for ExternalBlockShuffleService, " + ExternalBlockShuffleServiceOptions.IO_THREAD_NUM_FOR_DISK_TYPE.key() + " : " + map2 + ", " + ExternalBlockShuffleServiceOptions.LOCAL_DIRS + " : " + map);
    }
}
