/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.Election;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.LeaderElectionBean;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.QuorumCnxManager;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.quorum.Vote;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FastLeaderElection
implements Election {
    private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
    static final int finalizeWait = 200;
    static final int maxNotificationInterval = 60000;
    QuorumCnxManager manager;
    LinkedBlockingQueue<ToSend> sendqueue;
    LinkedBlockingQueue<Notification> recvqueue;
    QuorumPeer self;
    Messenger messenger;
    volatile long logicalclock;
    long proposedLeader;
    long proposedZxid;
    long proposedEpoch;
    volatile boolean stop = false;

    static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch) {
        byte[] requestBytes = new byte[40];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
        requestBuffer.clear();
        requestBuffer.putInt(state);
        requestBuffer.putLong(leader);
        requestBuffer.putLong(zxid);
        requestBuffer.putLong(electionEpoch);
        requestBuffer.putLong(epoch);
        requestBuffer.putInt(1);
        return requestBuffer;
    }

    public long getLogicalClock() {
        return this.logicalclock;
    }

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
        this.manager = manager;
        this.starter(self, manager);
    }

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        this.proposedLeader = -1L;
        this.proposedZxid = -1L;
        this.sendqueue = new LinkedBlockingQueue();
        this.recvqueue = new LinkedBlockingQueue();
        this.messenger = new Messenger(manager);
    }

    private void leaveInstance(Vote v) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("About to leave FLE instance: leader=" + v.getId() + ", zxid=0x" + Long.toHexString(v.getZxid()) + ", my id=" + this.self.getId() + ", my state=" + (Object)((Object)this.self.getPeerState()));
        }
        this.recvqueue.clear();
    }

    public QuorumCnxManager getCnxManager() {
        return this.manager;
    }

    @Override
    public void shutdown() {
        this.stop = true;
        LOG.debug("Shutting down connection manager");
        this.manager.halt();
        LOG.debug("Shutting down messenger");
        this.messenger.halt();
        LOG.debug("FLE is down");
    }

    private void sendNotifications() {
        for (QuorumPeer.QuorumServer server : this.self.getVotingView().values()) {
            long sid = server.id;
            ToSend notmsg = new ToSend(ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock, QuorumPeer.ServerState.LOOKING, sid, this.proposedEpoch);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending Notification: " + this.proposedLeader + " (n.leader), 0x" + Long.toHexString(this.proposedZxid) + " (n.zxid), 0x" + Long.toHexString(this.logicalclock) + " (n.round), " + sid + " (recipient), " + this.self.getId() + " (myid), 0x" + Long.toHexString(this.proposedEpoch) + " (n.peerEpoch)");
            }
            this.sendqueue.offer(notmsg);
        }
    }

    private void printNotification(Notification n) {
        LOG.info("Notification: " + n.toString() + (Object)((Object)this.self.getPeerState()) + " (my state)");
    }

    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if (this.self.getQuorumVerifier().getWeight(newId) == 0L) {
            return false;
        }
        return newEpoch > curEpoch || newEpoch == curEpoch && (newZxid > curZxid || newZxid == curZxid && newId > curId);
    }

    protected boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
        HashSet<Long> set = new HashSet<Long>();
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (!vote.equals(entry.getValue())) continue;
            set.add(entry.getKey());
        }
        return this.self.getQuorumVerifier().containsQuorum(set);
    }

    protected boolean checkLeader(HashMap<Long, Vote> votes, long leader, long electionEpoch) {
        boolean predicate = true;
        if (leader != this.self.getId()) {
            if (votes.get(leader) == null) {
                predicate = false;
            } else if (votes.get(leader).getState() != QuorumPeer.ServerState.LEADING) {
                predicate = false;
            }
        } else if (this.logicalclock != electionEpoch) {
            predicate = false;
        }
        return predicate;
    }

    protected boolean ooePredicate(HashMap<Long, Vote> recv, HashMap<Long, Vote> ooe, Notification n) {
        return this.termPredicate(recv, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && this.checkLeader(ooe, n.leader, n.electionEpoch);
    }

    synchronized void updateProposal(long leader, long zxid, long epoch) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updating proposal: " + leader + " (newleader), 0x" + Long.toHexString(zxid) + " (newzxid), " + this.proposedLeader + " (oldleader), 0x" + Long.toHexString(this.proposedZxid) + " (oldzxid)");
        }
        this.proposedLeader = leader;
        this.proposedZxid = zxid;
        this.proposedEpoch = epoch;
    }

    synchronized Vote getVote() {
        return new Vote(this.proposedLeader, this.proposedZxid, this.proposedEpoch);
    }

    private QuorumPeer.ServerState learningState() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            LOG.debug("I'm a participant: " + this.self.getId());
            return QuorumPeer.ServerState.FOLLOWING;
        }
        LOG.debug("I'm an observer: " + this.self.getId());
        return QuorumPeer.ServerState.OBSERVING;
    }

    private long getInitId() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            return this.self.getId();
        }
        return Long.MIN_VALUE;
    }

    private long getInitLastLoggedZxid() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            return this.self.getLastLoggedZxid();
        }
        return Long.MIN_VALUE;
    }

    private long getPeerEpoch() {
        if (this.self.getLearnerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            try {
                return this.self.getCurrentEpoch();
            }
            catch (IOException e) {
                RuntimeException re = new RuntimeException(e.getMessage());
                re.setStackTrace(e.getStackTrace());
                throw re;
            }
        }
        return Long.MIN_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Vote lookForLeader() throws InterruptedException {
        try {
            this.self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean);
        }
        catch (Exception e) {
            LOG.warn("Failed to register with JMX", (Throwable)e);
            this.self.jmxLeaderElectionBean = null;
        }
        if (this.self.start_fle == 0L) {
            this.self.start_fle = System.currentTimeMillis();
        }
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
            int notTimeout = 200;
            FastLeaderElection fastLeaderElection = this;
            synchronized (fastLeaderElection) {
                ++this.logicalclock;
                this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
            }
            LOG.info("New election. My id =  " + this.self.getId() + ", proposed zxid=0x" + Long.toHexString(this.proposedZxid));
            this.sendNotifications();
            block29: while (this.self.getPeerState() == QuorumPeer.ServerState.LOOKING && !this.stop) {
                Notification n = this.recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
                if (n == null) {
                    if (this.manager.haveDelivered()) {
                        this.sendNotifications();
                    } else {
                        this.manager.connectAll();
                    }
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = tmpTimeOut < 60000 ? tmpTimeOut : 60000;
                    LOG.info("Notification time out: " + notTimeout);
                    continue;
                }
                if (this.self.getVotingView().containsKey(n.sid)) {
                    switch (n.state) {
                        case LOOKING: {
                            if (n.electionEpoch > this.logicalclock) {
                                this.logicalclock = n.electionEpoch;
                                recvset.clear();
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) {
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch());
                                }
                                this.sendNotifications();
                            } else {
                                if (n.electionEpoch < this.logicalclock) {
                                    if (!LOG.isDebugEnabled()) continue block29;
                                    LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(this.logicalclock));
                                    continue block29;
                                }
                                if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) {
                                    this.updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    this.sendNotifications();
                                }
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                            }
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            if (!this.termPredicate(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock, this.proposedEpoch))) continue block29;
                            while ((n = this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
                                if (!this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) continue;
                                this.recvqueue.put(n);
                                break;
                            }
                            if (n != null) continue block29;
                            this.self.setPeerState(this.proposedLeader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                            Vote endVote = new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock, this.proposedEpoch);
                            this.leaveInstance(endVote);
                            Vote vote = endVote;
                            return vote;
                        }
                        case OBSERVING: {
                            LOG.debug("Notification from observer: " + n.sid);
                            continue block29;
                        }
                        case FOLLOWING: 
                        case LEADING: {
                            if (n.electionEpoch == this.logicalclock) {
                                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                                if (this.ooePredicate(recvset, outofelection, n)) {
                                    this.self.setPeerState(n.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                                    Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                    this.leaveInstance(endVote);
                                    Vote vote = endVote;
                                    return vote;
                                }
                            }
                            outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            if (!this.ooePredicate(outofelection, outofelection, n)) continue block29;
                            Object endVote = this;
                            synchronized (endVote) {
                                this.logicalclock = n.electionEpoch;
                                this.self.setPeerState(n.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                            }
                            endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            this.leaveInstance((Vote)endVote);
                            Object object = endVote;
                            return object;
                        }
                    }
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", (Object)n.state, (Object)n.sid);
                    continue;
                }
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
            fastLeaderElection = null;
            return fastLeaderElection;
        }
        finally {
            try {
                if (this.self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean);
                }
            }
            catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", (Throwable)e);
            }
            this.self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", (Object)this.manager.getConnectionThreadCount());
        }
    }

    protected class Messenger {
        WorkerSender ws;
        WorkerReceiver wr;

        public boolean queueEmpty() {
            return FastLeaderElection.this.sendqueue.isEmpty() || FastLeaderElection.this.recvqueue.isEmpty();
        }

        Messenger(QuorumCnxManager manager) {
            this.ws = new WorkerSender(manager);
            Thread t = new Thread((Runnable)this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]");
            t.setDaemon(true);
            t.start();
            this.wr = new WorkerReceiver(manager);
            t = new Thread((Runnable)this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]");
            t.setDaemon(true);
            t.start();
        }

        void halt() {
            this.ws.stop = true;
            this.wr.stop = true;
        }

        class WorkerSender
        extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerSender(QuorumCnxManager manager) {
                super("WorkerSender");
                this.stop = false;
                this.manager = manager;
            }

            @Override
            public void run() {
                while (!this.stop) {
                    try {
                        ToSend m = FastLeaderElection.this.sendqueue.poll(3000L, TimeUnit.MILLISECONDS);
                        if (m == null) continue;
                        this.process(m);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

            void process(ToSend m) {
                ByteBuffer requestBuffer = FastLeaderElection.buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch);
                this.manager.toSend(m.sid, requestBuffer);
            }
        }

        class WorkerReceiver
        extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerReceiver(QuorumCnxManager manager) {
                super("WorkerReceiver");
                this.stop = false;
                this.manager = manager;
            }

            @Override
            public void run() {
                block8: while (!this.stop) {
                    try {
                        ToSend notmsg;
                        QuorumCnxManager.Message response = this.manager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
                        if (response == null) continue;
                        if (!FastLeaderElection.this.self.getVotingView().containsKey(response.sid)) {
                            Vote current = FastLeaderElection.this.self.getCurrentVote();
                            ToSend notmsg2 = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), FastLeaderElection.this.logicalclock, FastLeaderElection.this.self.getPeerState(), response.sid, current.getPeerEpoch());
                            FastLeaderElection.this.sendqueue.offer(notmsg2);
                            continue;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Receive new notification message. My id = " + FastLeaderElection.this.self.getId());
                        }
                        if (response.buffer.capacity() < 28) {
                            LOG.error("Got a short response: " + response.buffer.capacity());
                            continue;
                        }
                        boolean backCompatibility = response.buffer.capacity() == 28;
                        response.buffer.clear();
                        Notification n = new Notification();
                        QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                        switch (response.buffer.getInt()) {
                            case 0: {
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            }
                            case 1: {
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            }
                            case 2: {
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            }
                            case 3: {
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            }
                            default: {
                                continue block8;
                            }
                        }
                        n.leader = response.buffer.getLong();
                        n.zxid = response.buffer.getLong();
                        n.electionEpoch = response.buffer.getLong();
                        n.state = ackstate;
                        n.sid = response.sid;
                        if (!backCompatibility) {
                            n.peerEpoch = response.buffer.getLong();
                        } else {
                            if (LOG.isInfoEnabled()) {
                                LOG.info("Backward compatibility mode, server id=" + n.sid);
                            }
                            n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                        }
                        int n2 = n.version = response.buffer.remaining() >= 4 ? response.buffer.getInt() : 0;
                        if (LOG.isInfoEnabled()) {
                            FastLeaderElection.this.printNotification(n);
                        }
                        if (FastLeaderElection.this.self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                            FastLeaderElection.this.recvqueue.offer(n);
                            if (ackstate != QuorumPeer.ServerState.LOOKING || n.electionEpoch >= FastLeaderElection.this.logicalclock) continue;
                            Vote v = FastLeaderElection.this.getVote();
                            notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), FastLeaderElection.this.logicalclock, FastLeaderElection.this.self.getPeerState(), response.sid, v.getPeerEpoch());
                            FastLeaderElection.this.sendqueue.offer(notmsg);
                            continue;
                        }
                        Vote current = FastLeaderElection.this.self.getCurrentVote();
                        if (ackstate != QuorumPeer.ServerState.LOOKING) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Sending new notification. My id =  " + FastLeaderElection.this.self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId());
                        }
                        if (n.version > 0) {
                            notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), FastLeaderElection.this.self.getPeerState(), response.sid, current.getPeerEpoch());
                        } else {
                            Vote bcVote = FastLeaderElection.this.self.getBCVote();
                            notmsg = new ToSend(ToSend.mType.notification, bcVote.getId(), bcVote.getZxid(), bcVote.getElectionEpoch(), FastLeaderElection.this.self.getPeerState(), response.sid, bcVote.getPeerEpoch());
                        }
                        FastLeaderElection.this.sendqueue.offer(notmsg);
                    }
                    catch (InterruptedException e) {
                        System.out.println("Interrupted Exception while waiting for new message" + e.toString());
                    }
                }
                LOG.info("WorkerReceiver is down");
            }
        }
    }

    public static class ToSend {
        long leader;
        long zxid;
        long electionEpoch;
        QuorumPeer.ServerState state;
        long sid;
        long peerEpoch;

        ToSend(mType type, long leader, long zxid, long electionEpoch, QuorumPeer.ServerState state, long sid, long peerEpoch) {
            this.leader = leader;
            this.zxid = zxid;
            this.electionEpoch = electionEpoch;
            this.state = state;
            this.sid = sid;
            this.peerEpoch = peerEpoch;
        }

        static enum mType {
            crequest,
            challenge,
            notification,
            ack;

        }
    }

    public static class Notification {
        public static final int CURRENTVERSION = 1;
        int version;
        long leader;
        long zxid;
        long electionEpoch;
        QuorumPeer.ServerState state;
        long sid;
        long peerEpoch;

        public String toString() {
            return new String(Long.toHexString(this.version) + " (message format version), " + this.leader + " (n.leader), 0x" + Long.toHexString(this.zxid) + " (n.zxid), 0x" + Long.toHexString(this.electionEpoch) + " (n.round), " + (Object)((Object)this.state) + " (n.state), " + this.sid + " (n.sid), 0x" + Long.toHexString(this.peerEpoch) + " (n.peerEpoch) ");
        }
    }
}

