/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.util;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.ACLProvider;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.BackgroundPathable;
import org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.flink.shaded.curator.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooDefs;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.data.ACL;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);

    public static CuratorFramework startCuratorFramework(Configuration configuration) {
        ACLProvider aclProvider;
        boolean isAuthEnabled;
        Preconditions.checkNotNull((Object)configuration, (String)"configuration");
        String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
        if (zkQuorum == null || StringUtils.isBlank((CharSequence)zkQuorum)) {
            throw new RuntimeException("No valid ZooKeeper quorum has been specified. You can specify the quorum via the configuration key '" + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "'.");
        }
        int sessionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT);
        int connectionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT);
        int retryWait = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_RETRY_WAIT);
        int maxRetryAttempts = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
        String root = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT);
        String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
        boolean disableSaslClient = configuration.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
        ZkClientACLMode aclMode = ZkClientACLMode.fromConfig(configuration);
        boolean enableAclClient = false;
        String aclScheme = null;
        String aclAuth = null;
        if (disableSaslClient) {
            aclScheme = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_ACL_SCHEME);
            aclAuth = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_ACL_AUTH);
            if (aclScheme != null && !aclScheme.isEmpty()) {
                enableAclClient = true;
                LOG.info("Access Zookeeper using {} ACL authentication.", (Object)aclScheme);
            }
        } else {
            LOG.info("Access Zookeeper using SASL authentication.");
        }
        boolean bl = isAuthEnabled = !disableSaslClient || enableAclClient;
        if (!isAuthEnabled && aclMode == ZkClientACLMode.CREATOR) {
            String errorMessage = "Cannot set ACL role to " + (Object)((Object)aclMode) + " since authentication is not enabled.";
            LOG.warn(errorMessage);
            throw new IllegalConfigurationException(errorMessage);
        }
        if (aclMode == ZkClientACLMode.CREATOR) {
            LOG.info("Enforcing creator for ZK connections");
            aclProvider = new SecureAclProvider();
        } else {
            LOG.info("Enforcing default ACL for ZK connections");
            aclProvider = new DefaultACLProvider();
        }
        String rootWithNamespace = ZooKeeperUtils.generateZookeeperPath(root, namespace);
        LOG.info("Using '{}' as Zookeeper namespace.", (Object)rootWithNamespace);
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(zkQuorum).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts)).namespace(rootWithNamespace.startsWith("/") ? rootWithNamespace.substring(1) : rootWithNamespace).aclProvider(aclProvider);
        CuratorFramework cf = enableAclClient ? builder.authorization(aclScheme, aclAuth.getBytes()).build() : builder.build();
        cf.start();
        if (enableAclClient) {
            try {
                ((BackgroundPathable)cf.setACL().withACL(builder.getAclProvider().getDefaultAcl())).forPath("/");
            }
            catch (Exception e) {
                throw new RuntimeException("Set ACL for the namespace path '" + cf.getNamespace() + "' failed.", e);
            }
        }
        return cf;
    }

    public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
        return HighAvailabilityMode.fromConfig(flinkConf).equals((Object)HighAvailabilityMode.ZOOKEEPER);
    }

    public static String getZooKeeperEnsemble(Configuration flinkConf) throws IllegalConfigurationException {
        String zkQuorum = flinkConf.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
        if (zkQuorum == null || StringUtils.isBlank((CharSequence)zkQuorum)) {
            throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
        }
        zkQuorum = zkQuorum.replaceAll("\\s+", "");
        return zkQuorum;
    }

    public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(CuratorFramework client, Configuration configuration) throws Exception {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, "");
    }

    public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(CuratorFramework client, Configuration configuration, String pathSuffix) {
        String leaderPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
        return new ZooKeeperLeaderRetrievalService(client, leaderPath);
    }

    public static ZooKeeperLeaderElectionService createLeaderElectionService(CuratorFramework client, Configuration configuration) throws Exception {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, "");
    }

    public static ZooKeeperLeaderElectionService createLeaderElectionService(CuratorFramework client, Configuration configuration, String pathSuffix) {
        String latchPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
        String leaderPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
        return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
    }

    public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(CuratorFramework client, Configuration configuration, Executor executor) throws Exception {
        Preconditions.checkNotNull((Object)configuration, (String)"Configuration");
        FileSystemStateStorageHelper<SubmittedJobGraph> stateStorage = ZooKeeperUtils.createFileSystemStateStorage(configuration, "submittedJobGraph");
        String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
        return new ZooKeeperSubmittedJobGraphStore(client, zooKeeperSubmittedJobsPath, stateStorage, executor);
    }

    public static CompletedCheckpointStore createCompletedCheckpoints(CuratorFramework client, Configuration configuration, JobID jobId, int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception {
        Preconditions.checkNotNull((Object)configuration, (String)"Configuration");
        String checkpointsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH);
        FileSystemStateStorageHelper<CompletedCheckpoint> stateStorage = ZooKeeperUtils.createFileSystemStateStorage(configuration, "completedCheckpoint");
        checkpointsPath = checkpointsPath + ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
        return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, client, checkpointsPath, stateStorage, executor);
    }

    public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(CuratorFramework client, Configuration configuration, JobID jobId) {
        String checkpointIdCounterPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
        checkpointIdCounterPath = checkpointIdCounterPath + ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
        return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath);
    }

    public static <T extends Serializable> FileSystemStateStorageHelper<T> createFileSystemStateStorage(Configuration configuration, String prefix) throws IOException {
        String rootPath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
        if (rootPath == null || StringUtils.isBlank((CharSequence)rootPath)) {
            throw new IllegalConfigurationException("Missing high-availability storage path for metadata. Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
        }
        return new FileSystemStateStorageHelper(rootPath, prefix);
    }

    public static String generateZookeeperPath(String root, String namespace) {
        if (!namespace.startsWith("/")) {
            namespace = '/' + namespace;
        }
        if (namespace.endsWith("/")) {
            namespace = namespace.substring(0, namespace.length() - 1);
        }
        if (root.endsWith("/")) {
            root = root.substring(0, root.length() - 1);
        }
        return root + namespace;
    }

    private ZooKeeperUtils() {
        throw new RuntimeException();
    }

    public static enum ZkClientACLMode {
        CREATOR,
        OPEN;


        public static ZkClientACLMode fromConfig(Configuration config) {
            String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL);
            if (aclMode == null || aclMode.equalsIgnoreCase(OPEN.name())) {
                return OPEN;
            }
            if (aclMode.equalsIgnoreCase(CREATOR.name())) {
                return CREATOR;
            }
            String message = "Unsupported ACL option: [" + aclMode + "] provided";
            LOG.error(message);
            throw new IllegalConfigurationException(message);
        }
    }

    public static class SecureAclProvider
    implements ACLProvider {
        @Override
        public List<ACL> getDefaultAcl() {
            return ZooDefs.Ids.CREATOR_ALL_ACL;
        }

        @Override
        public List<ACL> getAclForPath(String path) {
            return ZooDefs.Ids.CREATOR_ALL_ACL;
        }
    }
}

