/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hbase.haclient;

import com.alibaba.hbase.client.AliHBaseConstants;
import com.alibaba.hbase.haclient.ClusterSwitchTracker;
import com.alibaba.hbase.haclient.ClusterSwitchUtil;
import com.alibaba.hbase.haclient.ConnectInfo;
import com.alibaba.hbase.haclient.ConnectInfoUtil;
import com.alibaba.hbase.haclient.StrUtil;
import com.alibaba.hbase.haclient.SwitchCommand;
import com.alibaba.hbase.haclient.Switchable;
import com.alibaba.hbase.haclient.dualservice.AutoSwitch;
import com.alibaba.hbase.haclient.dualservice.DualConfigTracker;
import com.alibaba.hbase.haclient.dualservice.DualExecutor;
import com.alibaba.hbase.haclient.dualservice.DualTrace;
import com.alibaba.hbase.haclient.dualservice.DualUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AliHBaseMultiClusterConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.CreateMode;

public class AliHBaseMultiClusterConnectionImpl
extends AliHBaseMultiClusterConnection
implements Switchable {
    private static final Log LOG = LogFactory.getLog(AliHBaseMultiClusterConnectionImpl.class);
    private ClusterSwitchTracker masterClusterSwitchTracker;
    private List<ClusterSwitchTracker> slaveClusterSwitchTrackers = new ArrayList<ClusterSwitchTracker>();
    private volatile SwitchCommand currentSwitchCommand = null;
    private final Configuration originalConf;
    private final String masterClusterKey;
    private volatile String currentClusterKey = null;
    private volatile ConnectInfo connectInfo = null;
    private ZooKeeperWatcher linkZooKeeper = null;
    private Lock linkZookeeperLock = new ReentrantLock();
    private DualConfigTracker dualConfigTracker;

    protected AliHBaseMultiClusterConnectionImpl(Configuration conf, ExecutorService pool, User user) throws IOException {
        this(conf, false, pool, user);
    }

    protected AliHBaseMultiClusterConnectionImpl(Configuration conf, boolean managed) throws IOException {
        this(conf, managed, null, null);
    }

    public AliHBaseMultiClusterConnectionImpl(Configuration conf, boolean managed, ExecutorService pool, User user) throws IOException {
        super(conf, managed, pool, user);
        this.originalConf = new Configuration(conf);
        String endpoint = this.originalConf.get("hbase.client.endpoint");
        if (StringUtils.isBlank((String)endpoint) && StringUtils.isBlank((String)(endpoint = this.originalConf.get("hbase.zookeeper.quorum")))) {
            throw new IOException("hbase ha address can not be blank");
        }
        if (StringUtils.isBlank((String)AliHBaseConstants.getHaClusterID(this.originalConf))) {
            throw new IOException("haclient cluster id can not be blank");
        }
        try {
            this.connectInfo = ConnectInfoUtil.getConnectInfoFromZK(endpoint, conf);
            ConnectInfoUtil.flushConnectInfo(this.connectInfo, conf);
        }
        catch (Exception e) {
            LOG.warn((Object)("Get connect info from endpoint failed, " + e));
            this.connectInfo = ConnectInfoUtil.getConnectInfoFromXML(conf);
        }
        if (null == this.connectInfo) {
            throw new IOException("Get connect info failed.");
        }
        this.masterClusterKey = this.connectInfo.getMasterWatchZK();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            Configuration masterConf = ClusterSwitchUtil.createConfWithConnectKey(this.masterClusterKey, conf);
            this.masterClusterSwitchTracker = new ClusterSwitchTracker(masterConf, this, countDownLatch);
            List<String> slaveClusterKeys = this.connectInfo.getSlaveWatchZKList();
            if (!slaveClusterKeys.isEmpty()) {
                Thread.sleep(10L);
                for (String slaveKey : slaveClusterKeys) {
                    Configuration slaveConf = ClusterSwitchUtil.createConfWithConnectKey(slaveKey, conf);
                    ClusterSwitchTracker slaveTracker = new ClusterSwitchTracker(slaveConf, this, countDownLatch);
                    this.slaveClusterSwitchTrackers.add(slaveTracker);
                }
            }
            countDownLatch.await();
            if (this.dualServiceEnable) {
                this.autoSwitch = new AutoSwitch(this.originalConf);
                this.dualExecutor = new DualExecutor(this.conf, this.autoSwitch);
                this.dualConfigTracker = new DualConfigTracker(ConnectInfoUtil.getConfFromEndpoint(endpoint, this.originalConf), this);
                if (conf.getBoolean("hbase.dualservice.trace.enable", true)) {
                    DualTrace.init(endpoint, conf);
                }
            }
        }
        catch (InterruptedException ie) {
            LOG.error((Object)"Interrupted during init", (Throwable)ie);
            Thread.currentThread().interrupt();
            throw new IOException(ie);
        }
    }

    private boolean isNeedToSwitch(SwitchCommand command) {
        if (this.currentSwitchCommand == null) {
            return true;
        }
        return command.getTs() > this.currentSwitchCommand.getTs();
    }

    private void onChangeCluster(Configuration activeConf, Configuration standbyConf, SwitchCommand command, String targetkey) throws IOException {
        this.onChangeCluster(activeConf, standbyConf);
        this.currentSwitchCommand = command;
        this.currentClusterKey = targetkey;
        LOG.info((Object)("Switch successfully to " + command));
        String targetZk = ConnectInfoUtil.getZkFromConnectInfo(this.connectInfo, targetkey);
        LOG.info((Object)("Create link node to " + targetZk));
        try {
            this.startThreadForCreateLinkNode(targetZk, this.conf.getInt("hbase.link.retry.count", 10));
        }
        catch (Exception e) {
            LOG.warn((Object)("Create link node failed : " + e));
        }
        LOG.info((Object)("Create link node to " + targetZk + " success"));
    }

    private void switchToActiveWithNoCommand() throws IOException {
        String activeKey = this.connectInfo.getActiveConnectKey();
        SwitchCommand command = SwitchCommand.NOCOMMAND;
        Configuration activeConf = ClusterSwitchUtil.createConfWithConnectKey(activeKey, this.originalConf);
        Configuration standbyConf = null;
        if (this.dualServiceEnable) {
            standbyConf = ClusterSwitchUtil.createConfWithConnectKey(this.connectInfo.getStandbyConnectKey(), this.originalConf);
        }
        this.onChangeCluster(activeConf, standbyConf, command, activeKey);
    }

    @Override
    public synchronized void onNodeChange(byte[] bytes) {
        block14: {
            try {
                SwitchCommand command = ClusterSwitchUtil.toSwitchCommand(bytes);
                LOG.info((Object)("Received " + command + ", current=" + this.currentSwitchCommand));
                if (!this.isNeedToSwitch(command)) {
                    LOG.info((Object)"Ignore switch command since current command is newer");
                    return;
                }
                if (command.isSwitchBackToMaster()) {
                    String activeKey = this.connectInfo.getActiveConnectKey();
                    if (this.currentClusterKey != null && this.currentClusterKey.equals(activeKey)) {
                        LOG.info((Object)"Current is connect to active, skip switch");
                        return;
                    }
                    LOG.info((Object)("From current " + this.currentClusterKey + ", Switch back to master cluster: " + activeKey));
                    this.switchToActiveWithNoCommand();
                    return;
                }
                if (!ClusterSwitchUtil.isValidConnectKey(command.getClusterKey())) {
                    if (this.currentSwitchCommand != null) {
                        LOG.info((Object)"Ignore switch command since invalid cluster key");
                        return;
                    }
                    String activeKey = this.connectInfo.getActiveConnectKey();
                    LOG.info((Object)("Invalid cluster key; From current " + this.currentClusterKey + ", switch back to master cluster: " + activeKey));
                    this.switchToActiveWithNoCommand();
                    return;
                }
                try {
                    String connectKey = command.getClusterKey();
                    if (!ConnectInfoUtil.isVaildTargetConnectKey(this.connectInfo, connectKey)) {
                        LOG.info((Object)("Invalid target cluster key " + connectKey + ", is not active " + this.connectInfo.getConnectConf().getActive() + " or standby " + this.connectInfo.getConnectConf().getStandby()));
                        if (this.currentSwitchCommand == null) {
                            LOG.info((Object)"Current SwitchCommand is null, switch to active");
                            this.switchToActiveWithNoCommand();
                        }
                        return;
                    }
                    if (connectKey.equals(connectKey.equals(this.currentClusterKey))) {
                        LOG.info((Object)("Target connect key " + connectKey + " is same with current cluster key " + this.currentClusterKey + ", skip switch"));
                        return;
                    }
                    String lastConnectKey = this.currentClusterKey;
                    Configuration activeConf = ClusterSwitchUtil.createConfWithConnectKey(connectKey, this.originalConf);
                    Configuration standbyConf = null;
                    if (this.dualServiceEnable) {
                        standbyConf = connectKey.equals(this.connectInfo.getActiveConnectKey()) ? ClusterSwitchUtil.createConfWithConnectKey(this.connectInfo.getStandbyConnectKey(), this.originalConf) : ClusterSwitchUtil.createConfWithConnectKey(this.connectInfo.getActiveConnectKey(), this.originalConf);
                    }
                    this.onChangeCluster(activeConf, standbyConf, command, connectKey);
                    LOG.info((Object)("From current " + lastConnectKey + ",Switch to cluster: " + command.getClusterKey()));
                }
                catch (IOException ioE) {
                    if (this.currentSwitchCommand == null) {
                        String activeKey = this.connectInfo.getActiveConnectKey();
                        LOG.error((Object)("Failed to switch to " + command + ", switch back to master cluster " + activeKey + " since no cluster is connected ever."), (Throwable)ioE);
                        this.switchToActiveWithNoCommand();
                        break block14;
                    }
                    throw ioE;
                }
            }
            catch (Throwable t) {
                LOG.error((Object)"Error happened when switching cluster, switch not finished", t);
            }
        }
    }

    private void startThreadForCreateLinkNode(String zkCluster, int retry) {
        final String zk = zkCluster;
        final int retryCount = retry;
        if (this.linkZooKeeper != null) {
            try {
                this.linkZooKeeper.close();
                this.linkZooKeeper = null;
            }
            catch (Exception e) {
                LOG.warn((Object)("old link zookeeper close failed : " + e));
            }
        }
        Thread t = new Thread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (!AliHBaseMultiClusterConnectionImpl.this.linkZookeeperLock.tryLock()) {
                    return;
                }
                try {
                    for (int count = 0; count < retryCount; ++count) {
                        try {
                            if (AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper == null) {
                                Configuration linkConf = ClusterSwitchUtil.createConfWithConnectKey(zk, AliHBaseMultiClusterConnectionImpl.this.conf);
                                AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper = new ZooKeeperWatcher(linkConf, "Link", null, false);
                                String baseNode = ClusterSwitchUtil.getBaseNode(linkConf.get("haclient.base.node", "/haclient"), AliHBaseConstants.getHaClusterID(linkConf));
                                String linkNodeName = linkConf.get(ClusterSwitchUtil.ZOOKEEPER_LINK_NODE, ClusterSwitchUtil.ZOOKEEPER_LINK_NODE_DEFAULT);
                                String linkNode = ZKUtil.joinZNode((String)baseNode, (String)linkNodeName);
                                ZKUtil.createNodeIfNotExistsNoWatch((ZooKeeperWatcher)AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper, (String)baseNode, null, (CreateMode)CreateMode.PERSISTENT);
                                ZKUtil.createNodeIfNotExistsNoWatch((ZooKeeperWatcher)AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper, (String)linkNode, null, (CreateMode)CreateMode.PERSISTENT);
                                String node = ZKUtil.joinZNode((String)linkNode, (String)StrUtil.generateRandomString(5));
                                boolean result = ZKUtil.createEphemeralNodeAndWatch((ZooKeeperWatcher)AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper, (String)node, null);
                                if (!result) {
                                    String message = "create link node failed with " + count;
                                    LOG.warn((Object)message);
                                    throw new Exception(message);
                                }
                            }
                            LOG.info((Object)("Successfully set link node on zk '" + AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper.getQuorum() + "'"));
                            break;
                        }
                        catch (Throwable t) {
                            LOG.warn((Object)("Failed set link node on zk '" + AliHBaseMultiClusterConnectionImpl.this.linkZooKeeper.getQuorum() + "', will retry again..."), t);
                            Threads.sleep((long)30000L);
                            continue;
                        }
                    }
                }
                finally {
                    AliHBaseMultiClusterConnectionImpl.this.linkZookeeperLock.unlock();
                }
            }
        };
        t.setDaemon(true);
        t.setName("LinkNode-" + System.currentTimeMillis());
        t.start();
    }

    private void startThreadForUpdateDualConfig(final String endpoint, final Configuration clientConf) {
        Thread t = new Thread(){

            @Override
            public void run() {
                List<String> dualTables = null;
                try {
                    dualTables = DualUtil.getDualTablesFromZK(endpoint, clientConf);
                    DualUtil.flushDualTables(dualTables, clientConf);
                }
                catch (Exception e) {
                    LOG.warn((Object)("Get dual tables from endpoint failed, " + e));
                    dualTables = DualUtil.getDualTablesFromXML(AliHBaseMultiClusterConnectionImpl.this.conf);
                }
                AliHBaseMultiClusterConnectionImpl.this.updateDualConfig(dualTables);
            }
        };
        t.setDaemon(true);
        t.setName("UpdateDualTable-" + System.currentTimeMillis());
        t.start();
    }

    public void updateDualConfig(List<String> dualTables) {
        if (dualTables != null && !dualTables.isEmpty()) {
            for (String dualTable : dualTables) {
                if (!dualTable.contains("#")) {
                    this.enableTableDualService(dualTable);
                    continue;
                }
                int glitchtimeout = 30;
                String[] confs = dualTable.split("#");
                if (confs.length != 2) {
                    LOG.warn((Object)("Wrong dual table conf " + dualTable));
                    continue;
                }
                try {
                    glitchtimeout = Integer.valueOf(confs[1]);
                }
                catch (Exception e) {
                    LOG.warn((Object)("Wrong dual table conf " + dualTable));
                    continue;
                }
                if (dualTable.startsWith("glitchtimeout#")) {
                    this.conf.setInt("hbase.dualservice.glitchtimeout", glitchtimeout);
                    continue;
                }
                if (dualTable.startsWith("autoswitch#")) {
                    if (this.autoSwitch == null) continue;
                    if (glitchtimeout == 1) {
                        this.autoSwitch.setAutoSwitchEnable(true);
                        continue;
                    }
                    if (glitchtimeout != 0) continue;
                    this.autoSwitch.setAutoSwitchEnable(false);
                    continue;
                }
                this.setTableGlitchTimeout(confs[0], glitchtimeout);
            }
            LOG.info((Object)("Successfully update dual table conf for " + StringUtils.join(dualTables, (String)",")));
        }
    }

    private void enableTableDualService(String dualTable) {
        if (this.conf != null) {
            this.conf.setBoolean(DualExecutor.createTableConfKey(dualTable, "hbase.dualservice.enable"), true);
        }
        if (this.originalConf != null) {
            this.originalConf.setBoolean(DualExecutor.createTableConfKey(dualTable, "hbase.dualservice.enable"), true);
        }
    }

    private void setTableGlitchTimeout(String dualTable, int glitchTimeout) {
        if (this.conf != null) {
            this.conf.setInt(DualExecutor.createTableConfKey(dualTable, "hbase.dualservice.glitchtimeout"), glitchTimeout);
        }
        if (this.originalConf != null) {
            this.originalConf.setInt(DualExecutor.createTableConfKey(dualTable, "hbase.dualservice.glitchtimeout"), glitchTimeout);
        }
    }

    @Override
    public void close() {
        if (this.closed) {
            return;
        }
        super.close();
        if (this.masterClusterSwitchTracker != null) {
            this.masterClusterSwitchTracker.stop();
        }
        if (this.slaveClusterSwitchTrackers != null) {
            for (ClusterSwitchTracker slaveClusterSwitchTracker : this.slaveClusterSwitchTrackers) {
                if (slaveClusterSwitchTracker == null) continue;
                slaveClusterSwitchTracker.stop();
            }
        }
        if (this.dualConfigTracker != null) {
            this.dualConfigTracker.stop();
        }
        if (this.linkZooKeeper != null) {
            try {
                this.linkZooKeeper.close();
            }
            catch (Exception e) {
                LOG.warn((Object)("link zookeeper close failed : " + e));
            }
        }
        this.closed = true;
    }

    @Override
    public Configuration getOriginalConf() {
        return this.originalConf;
    }
}

