Zookeeper-选举流程

技术文档网 2021-04-25

QuorumCnxManager

每台服务器启动的时候都会启动一个QuorumCnxManager,负责各台服务器之间的底层Leader选举过程的通信。

QuorumCnxManager维护的消息队列

// 发送器集合,按照SID分组,每个 SendWorker 消息发送器,都对应一台远程ZK服务器,负责消息发送。
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;

// 消息发送队列,用于保存待发送的消息,按照SID进行分组,分别为集群中的每台机器分配一个单独的队列
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;

// 为每个SID保存最近发送的消息
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

// 消息接收队列,用于存放其他服务器接收到的信息。
public final ArrayBlockingQueue<Message> recvQueue;

服务器之间建立连接

QuorumCnxManager在启动的时候,会创建一个ServerSocker来监听Leader通信端口,开启端口监听后,ZK能够不断的接受来自其他服务器的“创建连接”请求

public QuorumCnxManager(final long mySid,
                        Map<Long,QuorumPeer.QuorumServer> view,
                        QuorumAuthServer authServer,
                        QuorumAuthLearner authLearner,
                        int socketTimeout,
                        boolean listenOnAllIPs,
                        int quorumCnxnThreadsSize,
                        boolean quorumSaslAuthEnabled,
                        ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
    this.senderWorkerMap = senderWorkerMap;
    // 创建一个ServerSocker来监听Leader通信端口
    listener = new Listener();
}

受来自其他服务器的“创建连接”请求

public class Listener extends ZooKeeperThread {

    public void run() {
        int numRetries = 0;
        InetSocketAddress addr;
        while((!shutdown) && (numRetries < 3)){
            try {
                ss = new ServerSocket();
                ss.setReuseAddress(true);
                if (listenOnAllIPs) {
                    int port = view.get(QuorumCnxManager.this.mySid)
                        .electionAddr.getPort();
                    addr = new InetSocketAddress(port);
                } else {
                    addr = view.get(QuorumCnxManager.this.mySid)
                        .electionAddr;
                }
                LOG.info("My election bind port: " + addr.toString());
                setName(view.get(QuorumCnxManager.this.mySid)
                        .electionAddr.toString());
                // 绑定端口
                ss.bind(addr);
                while (!shutdown) {
                    Socket client = ss.accept();
                    setSockOpts(client);
                    LOG.info("Received connection request "
                            + client.getRemoteSocketAddress());
                    if (quorumSaslAuthEnabled) {
                        receiveConnectionAsync(client);
                    } else {
                        // 处理其他服务器的连接请求
                        receiveConnection(client);
                    }

                    numRetries = 0;
                }
            } catch (IOException e) {
                // 省略其他部分
            }
        }
        LOG.info("Leaving listener");
        if (!shutdown) {
            LOG.error("As I'm leaving the listener thread, "
                    + "I won't be able to participate in leader "
                    + "election any longer: "
                    + view.get(QuorumCnxManager.this.mySid).electionAddr);
        }
    }
    // 省略其他部分
}

处理其他服务器的连接请求: 根据接收到的请求,提取SID,通过和自己的SID进行比较,如果更大,则建立连接,创建消息发送、接受器线程等,否则销毁连接,保证不会存在重复连接问题

public void receiveConnection(final Socket sock) {
    DataInputStream din = null;
    try {
        // 获取输入数据
        din = new DataInputStream(
                new BufferedInputStream(sock.getInputStream()));

        handleConnection(sock, din);
    } catch (IOException e) {
        LOG.error("Exception handling connection, addr: {}, closing server connection",
                 sock.getRemoteSocketAddress());
        closeSocket(sock);
    }
}

private void handleConnection(Socket sock, DataInputStream din)
        throws IOException {
    Long sid = null;
    // 获取sid
    try {
        // Read server id
        sid = din.readLong();
        if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
            sid = din.readLong();

            // next comes the #bytes in the remainder of the message
            // note that 0 bytes is fine (old servers)
            int num_remaining_bytes = din.readInt();
            if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
                LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                closeSocket(sock);
                return;
            }
            byte[] b = new byte[num_remaining_bytes];

            // remove the remainder of the message from din
            int num_read = din.read(b);
            if (num_read != num_remaining_bytes) {
                LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
            }
        }
        if (sid == QuorumPeer.OBSERVER_ID) {
            /*
             * Choose identifier at random. We need a value to identify
             * the connection.
             */
            sid = observerCounter.getAndDecrement();
            LOG.info("Setting arbitrary identifier to observer: " + sid);
        }
    } catch (IOException e) {
        closeSocket(sock);
        LOG.warn("Exception reading or writing challenge: " + e.toString());
        return;
    }

    // do authenticating learner
    LOG.debug("Authenticating learner server.id: {}", sid);
    authServer.authenticate(sock, din);

    // 如果接受到的SID比自己的sid小,则获取消息发送器、断开连接.
    if (sid < this.mySid) {
        /*
         * This replica might still believe that the connection to sid is
         * up, so we have to shut down the workers before trying to open a
         * new connection.
         */
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }

        /*
         * Now we start a new connection
         */
        LOG.debug("Create new connection to server: " + sid);
        closeSocket(sock);
        connectOne(sid);

        // Otherwise start worker threads to receive data.
    } else {
        // 创建消息发送器和消息接收器
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);

        if(vsw != null)
            vsw.finish();
        // 将SID加入发送器集合
        senderWorkerMap.put(sid, sw);
        // 将SID加入消息发送队列集合中
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
        // 启动消息发送和接收线程
        sw.start();
        rw.start();

        return;
    }
}

FastLeaderElection中消息处理队列

// 选票发送队列,保存待发送的选票
LinkedBlockingQueue<ToSend> sendqueue;

// 选票接受队列,保存接受到的外部投票
LinkedBlockingQueue<Notification> recvqueue;

FastLeaderElection与QuorumCnxManager关系图 FastLeaderElection与QuorumCnxManager关系图

zookeeper 集群启动时的选举

  1. 每个服务器发出一个投票,初始情况下都会以自己为leader发送投票,投票形式(myid, zxid)
  2. 接收其他服务器的投票,并判断其有效性,是否是同一轮投票,是否是looking状态下的投票
  3. 处理投票,根据规则进行投票PK,对于PK失败的选票需要变更选票,再重新发送选票,而成功的选票直接发送原来选票即可 3.1 先比较zxid,zxid比较大的优先成为leader 3.2 如果zxid一样,再比较myid
  4. 统计投票,每次投票后,服务器都会统计本轮次接收的票数信息,如果有过半的机器投了相同的票,则过半机器为leader,停止投票
  5. 改变服务器状态,服务器自己根据角色进行变更,follower变成following,leader变成leading

运行过程中leader挂了的选举

  1. 变更服务器状态,所有非observe的服务器变更为looking状态
  2. 每个服务器根据当前zxid构造投票,第一次是myid都是自己,并将投票发送给所有服务器
  3. 接收其他服务器投票
  4. 投票PK
  5. 统计投票,判断过半
  6. 变更服务器状态

选举算法流程

选举算法流程

选票结构

public class Vote {
    // 当前服务器自身的id
    final private long id;
    // 当前服务器最新的zxid值
    final private long zxid;
     // 当前服务器的选举轮次
    final private long electionEpoch;
     // 被推举服务器的轮次
    final private long peerEpoch;
     // 当前服务器状态
    final private ServerState state;
}

轮询判断机器状态

// --QuorumPeer
public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                LOG.info("LOOKING");

                if (Boolean.getBoolean("readonlymode.enabled")) {
                    // 省略其他代码
                } else {
                    try {
                        setBCVote(null);
                        // 进行投票选举
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            // 省略其他代码
            }
        }
    } finally {
        // 省略其他代码
    }
}

选举主流程

public Vote lookForLeader() throws InterruptedException {
    // 省略其他代码
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = finalizeWait;

        synchronized(this) {
            // 1. 自增选举轮次,所有有效投票必须在同一个轮次中进行
            logicalclock++;
            // 2. 初始化投票
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() +
                ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        // 3. 发送初始化选票
        sendNotifications();

        // 循环,直到选出leader
        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            // 4. 从队列 recvqueue 中接收外部选票信息
            Notification n = recvqueue.poll(notTimeout,
                    TimeUnit.MILLISECONDS);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if(n == null){
                if(manager.haveDelivered()){ // 判断是否已经连接
                    sendNotifications(); // 发送自己选票
                } else {
                    // 4.2 没有建立连接需要执行重新连接
                    manager.connectAll();
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            }
            else if(self.getVotingView().containsKey(n.sid)) {
                /*
                 * Only proceed if the vote comes from a replica in the
                 * voting view.
                 */
                switch (n.state) {
                case LOOKING:
                    // 如果外部的选举轮次大于内部轮次
                    if (n.electionEpoch > logicalclock) {
                        // 将外部选举轮次赋值给内部轮次
                        logicalclock = n.electionEpoch;
                        // 清空自己收到的投票
                        recvset.clear();
                        // 使用初始化的投票与外部投票进行PK
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            // PK失败使用外部投票更新自己的投票信息
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            // PK成功使用初始化的自己的投票更新投票信息
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        // 再次发送更新后的投票信息
                        sendNotifications();

                    // 如果外部投票的批次比内部批次小直接忽略
                    } else if (n.electionEpoch < logicalclock) {
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock));
                        }
                        break;

                    // 轮次相同,进行PK,PK失败进行投票更新及发送投票, PK成功不需要发送投票????
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if(LOG.isDebugEnabled()){
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    // 投票归档,记录当前服务器本轮次收到所有外部服务器的投票
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    // 投票统计,判断是否有半数投给了内部选票
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock, proposedEpoch))) {

                        // 等待200毫秒确定是否还有更优投票
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){
                                recvqueue.put(n);
                                break;
                            }
                        }

                        // 200毫秒后如果没有更优投票则进行服务器状态更新
                        if (n == null) {
                            // 变更服务器状态
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                            // 使用leader投票构造对象
                            Vote endVote = new Vote(proposedLeader,
                                                    proposedZxid,
                                                    logicalclock,
                                                    proposedEpoch);
                            // 清空投票接收队列
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                // 省略其他代码
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        // 省略其他代码
    }
}

发送选票

private void sendNotifications() {
    for (QuorumServer server : self.getVotingView().values()) {
        long sid = server.id;

        // 构造选票信息
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock,
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch);
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        // 将待发送投票放到 sendqueue(LinkedBlockingQueue<ToSend>) 队列中,待 WorkerSender 线程发送
        sendqueue.offer(notmsg);
    }
}

投票PK

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
            Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }

    /*
     * We return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */

    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

投票统计,判断是否已经超过半数投票认可当前内部投票

protected boolean termPredicate(
        HashMap<Long, Vote> votes, // 收到的外部投票集合
        Vote vote) { // 当前认可的内部投票

    HashSet<Long> set = new HashSet<Long>();

    for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())){
            set.add(entry.getKey());
        }
    }

    // 判断是否已经超过半数投票认可当前内部投票
    return self.getQuorumVerifier().containsQuorum(set);
}

相关文章

  1. 硅谷互联网公司的开发流程

    开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro

  2. RESTful-表述性状态转移风格

    REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表-&gt;就是产品资源 最重要的是如何表示一个资源 地址即

  3. 稳定性思考

    产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能

  4. Supervisor守护队列发邮件

    安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et

  5. 安装libsodium,让服务器支持chacha20等加密方式

    用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod

随机推荐

  1. 硅谷互联网公司的开发流程

    开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro

  2. RESTful-表述性状态转移风格

    REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表-&gt;就是产品资源 最重要的是如何表示一个资源 地址即

  3. 稳定性思考

    产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能

  4. Supervisor守护队列发邮件

    安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et

  5. 安装libsodium,让服务器支持chacha20等加密方式

    用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod