package com.alibaba.blink.streaming.connector.zookeeper.utils;

import com.alibaba.blink.streaming.connectors.common.Constants;
import com.alibaba.blink.streaming.connectors.common.conf.BlinkOptions;
import com.alibaba.blink.streaming.connectors.common.reader.MonotonyIncreaseProgress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/blink/streaming/connector/zookeeper/utils/ZKUtils.class */
public class ZKUtils {
    public static final String IS_UPGRADE = "1";
    public static final String RECEIVED_BLOCK_SIGNAL = "-1";
    private final String zkQuorum;
    private final String zkPath;
    private final String zkSourceFlag;
    private final String zkUpgradePath;
    private final String zkJobTypePath;
    private final Map<String, ZKNode> tableNameToZKNode = new ConcurrentHashMap();
    private CuratorFramework client = null;
    private static ZKUtils instance;
    private static final Logger LOG = LoggerFactory.getLogger(ZKUtils.class);
    private static Lock lock = new ReentrantLock();
    public static Configuration conf = new Configuration();

    /* loaded from: input_file:com/alibaba/blink/streaming/connector/zookeeper/utils/ZKUtils$ZKNode.class */
    public static class ZKNode {
        private String tableName;
        private final String zkSinkFlag;
        private final String zkSourceOffsetPath;
        private final Map<String, String> inputSplitPaths = new HashMap();
        private CuratorFramework client;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/alibaba/blink/streaming/connector/zookeeper/utils/ZKUtils$ZKNode$CONNECTOR_TYPE.class */
        public enum CONNECTOR_TYPE {
            SOURCE,
            SINK
        }

        public ZKNode(String str, String str2, CuratorFramework curatorFramework, CONNECTOR_TYPE connector_type) {
            this.tableName = str;
            this.zkSinkFlag = str2 + "/sink/" + str + "_sinkFlag";
            this.zkSourceOffsetPath = str2 + "/source/" + str + "_offset_";
            this.client = curatorFramework;
            if (connector_type == CONNECTOR_TYPE.SINK) {
                init();
            }
        }

        private void init() {
            createPath(this.zkSinkFlag);
        }

        public void setDatas(List<Tuple2<InputSplit, MonotonyIncreaseProgress>> list) {
            for (Tuple2<InputSplit, MonotonyIncreaseProgress> tuple2 : list) {
                setData((InputSplit) tuple2.f0, (MonotonyIncreaseProgress) tuple2.f1);
            }
        }

        public boolean setData(InputSplit inputSplit, MonotonyIncreaseProgress monotonyIncreaseProgress) {
            if (monotonyIncreaseProgress == null) {
                return false;
            }
            int splitNumber = inputSplit.getSplitNumber();
            Iterator<String> it = monotonyIncreaseProgress.getPartitions().iterator();
            while (it.hasNext()) {
                String next = it.next();
                String str = splitNumber + (next != "" ? "/" + next : "");
                String str2 = this.inputSplitPaths.get(str);
                if (str2 == null) {
                    str2 = this.zkSourceOffsetPath + str;
                    createPath(str2);
                    this.inputSplitPaths.put(str, str2);
                }
                try {
                    if (this.client.checkExists().forPath(str2) == null) {
                        ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(str2, monotonyIncreaseProgress.getPartitionToOffset().get(next).toString().getBytes());
                    } else {
                        this.client.setData().forPath(str2, monotonyIncreaseProgress.getPartitionToOffset().get(next).toString().getBytes());
                    }
                } catch (Throwable th) {
                    ZKUtils.LOG.error("set data fail {}", th);
                    return false;
                }
            }
            return true;
        }

        private boolean createPath(String str) {
            if (this.client == null) {
                return false;
            }
            try {
                if (this.client.checkExists().forPath(str) == null) {
                    ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(str, "0".getBytes());
                }
                return true;
            } catch (Throwable th) {
                ZKUtils.LOG.error("create path failed", th);
                return false;
            }
        }

        public boolean setSourceFlag(String str) {
            if (this.client == null || str == null) {
                return false;
            }
            Iterator<String> it = this.inputSplitPaths.values().iterator();
            while (it.hasNext()) {
                try {
                    this.client.setData().forPath(it.next(), str.getBytes());
                } catch (Throwable th) {
                    ZKUtils.LOG.error("set source flag data error {}", th);
                    return false;
                }
            }
            return true;
        }

        public boolean setSinkFlag(String str) {
            if (this.client == null) {
                return false;
            }
            try {
                if (this.client.checkExists().forPath(this.zkSinkFlag) != null) {
                    this.client.setData().forPath(this.zkSinkFlag, str.getBytes());
                } else {
                    ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(this.zkSinkFlag, str.getBytes());
                }
                return true;
            } catch (Throwable th) {
                ZKUtils.LOG.error("set sink flag data error {}", th);
                return false;
            }
        }

        public String getSinkFlag() {
            if (this.client == null) {
                return null;
            }
            try {
                return new String((byte[]) this.client.getData().forPath(this.zkSinkFlag));
            } catch (Throwable th) {
                ZKUtils.LOG.error("get sink flag data error {}", th);
                return null;
            }
        }
    }

    private ZKUtils() {
        String string = conf.getString(BlinkOptions.INNER_SAMPLE_PROJECT_RAW, "blink_user_daily_test");
        String string2 = conf.getString(BlinkOptions.INNER_SAMPLE_JOB_NAME_RAW, "abtest_sls");
        this.zkQuorum = conf.getString(BlinkOptions.INNER_SAMPLE_ZK_QUORUM_RAW, "hadoop1026.et2sqa.tbsite.net:2181,hadoop1027.et2sqa.tbsite.net:2181,hadoop1028.et2sqa.tbsite.net:2181");
        this.zkPath = "/" + string + "/" + string2;
        this.zkUpgradePath = this.zkPath + "/upgrade";
        this.zkJobTypePath = this.zkPath + "/jobType";
        this.zkSourceFlag = this.zkPath + "/sourceFlag";
        init();
    }

    public void initSourceTable(String str) {
        this.tableNameToZKNode.put(str, new ZKNode(str, this.zkPath, this.client, ZKNode.CONNECTOR_TYPE.SOURCE));
    }

    public void initSinkTable(String str) {
        this.tableNameToZKNode.put(str, new ZKNode(str, this.zkPath, this.client, ZKNode.CONNECTOR_TYPE.SINK));
    }

    public static ZKUtils getInstance() {
        if (instance == null) {
            try {
                lock.lock();
                if (instance == null) {
                    instance = new ZKUtils();
                }
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }
        return instance;
    }

    public static void setConf(RuntimeContext runtimeContext) {
        if (runtimeContext == null || runtimeContext.getExecutionConfig() == null || runtimeContext.getExecutionConfig().getGlobalJobParameters() == null || runtimeContext.getExecutionConfig().getGlobalJobParameters().toMap() == null) {
            LOG.error("the enviroment is null");
            return;
        }
        for (Map.Entry entry : runtimeContext.getExecutionConfig().getGlobalJobParameters().toMap().entrySet()) {
            conf.setString((String) entry.getKey(), (String) entry.getValue());
        }
    }

    private boolean init() {
        this.client = CuratorFrameworkFactory.builder().connectString(this.zkQuorum).sessionTimeoutMs(Constants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(Constants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT, 3)).build();
        this.client.start();
        return true;
    }

    public void setDatas(String str, List<Tuple2<InputSplit, MonotonyIncreaseProgress>> list) {
        if (this.client == null || list == null) {
            return;
        }
        this.tableNameToZKNode.get(str).setDatas(list);
    }

    public void setSourceFlag(String str, String str2) {
        if (this.client == null || str2 == null) {
            return;
        }
        this.tableNameToZKNode.get(str).setSourceFlag(str2);
    }

    public String getSourceFlag() {
        if (this.client == null) {
            return null;
        }
        try {
            return new String((byte[]) this.client.getData().forPath(this.zkSourceFlag));
        } catch (Throwable th) {
            LOG.error("get source flag Data failed {}", th);
            return null;
        }
    }

    public void setSourceFlag(String str) {
        if (this.client == null) {
            return;
        }
        try {
            if (this.client.checkExists().forPath(this.zkSourceFlag) == null) {
                ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(this.zkSourceFlag, str.getBytes());
            } else {
                this.client.setData().forPath(this.zkSourceFlag, str.getBytes());
            }
        } catch (Exception e) {
            LOG.error("set source flag fail", e);
        }
    }

    public String getSinkFlag(String str) {
        ZKNode zKNode;
        if (this.client == null || (zKNode = this.tableNameToZKNode.get(str)) == null) {
            return null;
        }
        return zKNode.getSinkFlag();
    }

    public boolean setSinkFlag(String str, String str2) {
        if (this.client == null) {
            return false;
        }
        return this.tableNameToZKNode.get(str).setSinkFlag(str2);
    }

    public String getUpgradeData() {
        if (this.client == null) {
            return null;
        }
        try {
            return new String((byte[]) this.client.getData().forPath(this.zkUpgradePath));
        } catch (Throwable th) {
            LOG.debug("get upgrade Data failed {}", th);
            return null;
        }
    }

    public void setUpgrade(String str) {
        if (this.client == null) {
            return;
        }
        try {
            if (this.client.checkExists().forPath(this.zkUpgradePath) == null) {
                ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(this.zkUpgradePath, str.getBytes());
            } else {
                this.client.setData().forPath(this.zkUpgradePath, str.getBytes());
            }
        } catch (Exception e) {
            LOG.error("set upgrade flag fail", e);
        }
    }

    public String getJobType() {
        if (this.client == null) {
            return null;
        }
        try {
            return new String((byte[]) this.client.getData().forPath(this.zkJobTypePath));
        } catch (Throwable th) {
            LOG.error("get job type error {}", th);
            return null;
        }
    }

    public boolean close() {
        if (this.client == null) {
            return true;
        }
        this.client.close();
        return true;
    }
}
