Java 笔记系列——08- 分布式协调(Zookeeper)

1、Zookeeper

  • zookeeper 是一个分布式的协调中间件,能解决分布式下的组件协调问题,所以才有许多应用使用 zookeeper
    • kafka 集成 Zookeeper,实现集群选举(leader 选举)、配置管理
    • hbase 集成 zookeeper,实现集群管理
    • sharding jdbc 集成 zookeeper,实现注册中心、配置中心

配置安装

  • 单机模式。
    1. 下载 zookeeper 安装包,完成后解压
    2. 初次使用 zookeeper,需要将 conf 目录下的 zoo_sample.cfg 文件 copy 一份重命名为 zoo.cfg
    3. 修改 zoo.cfg 文件中的 dataDir 目录,dataDir 表示日志文件存放的路径
    4. 启动服务:bin/zkServer.sh start
    5. 查看服务状态:bin/zkServer.sh status
    6. 停止服务:bin/zkServer.sh stop
    7. 重启服务:bin/zkServer.sh restart
    8. 连接服务器:zkCli.sh -timeout 0 -r -server ip:port
  • 集群模式
    1. 多台服务器按照单机模式准备好配置
    2. 在数据文件目录下创建 myid 文件内容为数字编号。
    3. zoo.cfg 文件中增加内容
      # 编号与myid文件内容一致
      server.1=192.168.221.128:2888:3888
      server.2=192.168.221.129.2888:3888
      server.3=192.168.221.130.2888:2888
      
    4. 如果是 observer 节点是如下配置
      #只有observer节点有这个
      peerType=observer
      server.1=192.168.221.128:2888:3888
      server.2=192.168.221.129:2888:3888
      server.3=192.168.221.0:2888:3888
      #所有机器都需要配置这个
      server.4=192.168.221.1:2888:3888:observer
      
    5. 启动 zookeeper

数据结构

  • zookeeper 的视图结构和标准的文件系统非常类似,每一个节点称之为 ZNode, 是 zookeeper 的最小单元。每个 znode 上都可以保存数据以及挂载子节点,构成一个层次化的树形结构image-20220619123335089
  • 操作命令
    • 创建节点 :create [-s] [-e] [-c] [-t ttl] path [data] [acl]
      • [-s]:sequential 序列化的,即可以重复创建,在路径后面加上序列号
      • [-e]:ephemeral 临时的,断开连接后自动失效
      • [-c] :表示 container node(容器节点),
      • [-t ttl]:表示 TTL Nodes(带超时时间的节点)
      • [acl]:是针对这个节点创建一个权限的,如果创建权限了,则拥有权限的才可以访问
    • 删除节点:delete [-v version] path 删除节点,-v 表示版本号,实现乐观锁机制
    • 更新节点:set [-s] [-v version] path data 给节点赋值 -s 返回节点状态
    • 查询节点信息:get [-s] [-w] path 获取指定节点的值
  • Znode 分为多种类型
    • 持久化节点,PERSISTENT。不会随客户端的断开而自动删除,是创建节点的默认类型。image-20220619123613172
    • 带序号的持久化节点,PERSISTENT_SEQUENTIAL。持久化节点增加了序号,znode 的名字将被附加一个单调递增的数字image-20220619123701310
    • 临时节点,EPHEMERAL。当客户端断开时自动删除。示例:如果该 Client 创建了/Server1 和/Server2 这两个节点,当 Client 的 session 断开后,这两个节点会被 Zookeeper 自动删除。临时节点不能存在子节点image-20220619123803988
    • 带序号的临时节点,EPHEMERAL_SEQUENTIAL。临时节点增加了序号,znode 的名字将被附加一个单调递增的数字。临时节点不能存在子节点image-20220619123922878
    • 容器节点,CONTAINER。container 节点是一个特殊用途的节点,它是为 Leader、Lock 等操作而设计的节点类型,它的作用是: 当容器节点的最后一个子节点被删除后,容器节点将会被标注并且在一段时间后删除。由于容器节点存在这个特性,所以当我们在容器节点下创建一个子节点时,需要捕获 KeeperException.NoNodeException 异常,如果捕获到这个异常,就需要重新创建容器节点。
    • 超时节点,TTL。如果某个节点设置为 TTL 节点类型,那么这个节点在指定 TTL 时间(单位为毫秒)段内没有修改并且没有子节点时,该节点会在一段时间后被删除。属于 zookeeper 的扩展类型,默认不开启,如果要想使用该类型,必须在 zookeeper 的 bin/zkService.sh 中修改 zookeeper 的 java 环境变量 zookeeper.extendedTypesEnabled=true
    • 带序号的超时节点,PERSISTENT_SEQUENTIAL_WITH_TTL。超时节点加入了序号。
  • 每一个 Znode 除了存储数据内容以外,还存储了数据节点本身的一些状态信息,通过 get 命令可以获得状态信息的详细内容image-20220619122502119
  • zookeeper 为数据节点引入了版本的概念,每个数据节点都有三类版本信息,对数据节点任何更新操作都会引起版本号的变化 。version 属性就是用来实现乐观锁机制的“写入校验” image-20220620120942406

功能特性

  • watcher 机制

    • zookeeper 提供了分布式数据的发布/订阅功能,zookeeper 允许客户端向服务端注册一个 watcher 监 听,当服务端的一些指定事件触发了 watcher,那么服务端就会向客户端发送一个事件通知。
    • zookeeper 提供以下几种命令来对指定节点设置监听
      • get [-s] [-w] path:监听指定 path 节点的修改和删除事件。同样该事件也是一次性触发。
        get -w /node
        # 在其他窗口执行下面命令,会触发相关事件
        set /node 123
        delete /node
        
      • ls [-s] [-w] [-R] path : 监控指定 path 的子节点的添加和删除事件。
        ls -w /node
        # 在其他窗口执行下面命令,会触发相关事件
        create /node/node1
        delete /node/node1
        
      • stat [-w] path:作用和 get 完全相同
      • addWatch [-m mode] path:作用是针对指定节点添加事件监听,支持两种模式
        • PERSISTENT,持久化订阅,针对当前节点的修改和删除事件,以及当前节点的子节点的删除和新增事件。
        • PERSISTENT_RECURSIVE,持久化递归订阅,在 PERSISTENT 的基础上,增加了子节点修改 **的事件触发,以及子节点的子节点的数据变化都会触发相关事件(满足递归订阅特性) **
  • Session 会话机制

    • 状态流转
      1. 首先,客户端向 Zookeeper Server 发起连接请求,此时状态为 CONNECTING
      2. 当连接建立好之后,Session 状态转化为 CONNECTED,此时可以进行数据的 IO 操作。
      3. 如果 Client 和 Server 的连接出现丢失,则 Client 又会变成 CONNECTING 状态
      4. 如果会话过期或者主动关闭连接时,此时连接状态为 CLOSE
      5. 如果是身份验证失败,直接结束 image-20220620121424247
  • 权限控制

    • Zookeeper 作为一个分布式协调框架,内部存储了一些分布式系统运行时的状态的数据,比如 master 选举、比如分布式锁。对这些数据的操作会直接影响到分布式系统的运行状态。因此,为了保证 zookeeper 中的数据的安全性,避免误操作带来的影响。Zookeeper 提供了一套 ACL 权限控制机制来保证数据的安全
    • ACL 权限控制,使用: schemei:d:perm 来标识。
      • Scheme(权限模式),标识授权策略

        Zookeeper 提供以下权限模式,所谓权限模式,就是使用什么样的方式来进行授权

        • world:默认方式,相当于全部都能访问。
        • auth:代表已经认证通过的用户(cli 中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户)
        • digest:即 用户名:密码 这种方式认证,这也是业务系统中最常用的。用字符串 sername:password 来产生一个 MD5 串,然后该串被用来作为 ACL ID。认证是通过明文发送 username:password 来进行的,当用在 ACL 时,表达式为 username:base64,base64 是 password 的 SHA1 摘要的编码。
        • **ip:通过 ip 地址来做权限控制,比如 ip:192.168.1.1 表示权限控制都是针对这个 ip 地址的。也可以针对网段 **ip:192.168.1.1/24,此时 addr 中的有效位与客户端 addr 中的有效位进行比对
      • ID(授权对象):指权限赋予的用户或一个指定的实体,不同的权限模式下,授权对象不同 image-20220620122234312

      • Permission:指通过权限检查后可以被允许的操作,create /delete /read/write/admin

        • create: 允许对子节点 Create 操作
        • read :允许对本节点 GetChildren 和 GetData 操作
        • write :允许对本节点 SetData 操作
        • delete :允许对子节点 Delete 操作
        • admin :允许对本节点 setAcl 操作
    • ZooKeeper 的权限控制是基于每个 znode 节点的,需要对每个节点设置权限,每个 znode 支持设置多种 权限控制方案和多个权限,子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它 的子节点。

使用实例

  • 在 java 中使用 Apache Curator 完成 zookeeper 的操作 ,Curator 是一个 zookeeper 的 Java 客户端
  • 每一个 Java 客户端其实都是提供了三个功能
    • 建立连接
    • 基于连接进行增删改查
    • 提供了一些解决方案的封装
  • 导入依赖
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>5.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.2.0</version>
    </dependency>
    
  • Example
    public class CuratorExample {
        final CuratorFramework curatorFramework;
    
        {
            curatorFramework = CuratorFrameworkFactory
                .builder()
                .connectionTimeoutMs(20000)
                .connectString("192.168.221.128:2181")
                // 指定重试策略
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .sessionTimeoutMs(15000)
                .build();
            curatorFramework.start(); //启动
        }
    
    
        /**
         * watch演示
         *
         * @param node 监听的节点
         */
        public void witchTest(String node) {
            CuratorCache curatorCache = CuratorCache.
                build(curatorFramework, node, CuratorCache.Options.SINGLE_NODE_CACHE);
            CuratorCacheListener listener = CuratorCacheListener.builder()
                .forAll((type, oldData, data) -> System.out.println("事件类型:" + type + ":childData:" + oldData + ":data" + data)).build();
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
        }
    
        /**
         * 同步CURD
         */
        public void curdTest() throws Exception {
            // 创建持久化节点,如果父节点不存在一并创建,并为节点赋值
            String node = curatorFramework.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath("/node", "Hello World".getBytes());
    
            //获取节点的value,以及节点的状态信息,并将状态存储到stat
            Stat stat = new Stat();
            byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(node);
    
            // 修改节点的值,
            curatorFramework.setData().withVersion(stat.getVersion()).forPath(node, "Update Date Result".getBytes());
    
            // 删除节点
            curatorFramework.delete().forPath(node);
            Stat existStat = curatorFramework.checkExists().forPath(node);
            if (existStat == null) {
                System.out.println("节点删除成功");
            }
        }
    
        /***
         * 异步CURD
         */
        public void asyncCRUD() throws Exception {
            String node = curatorFramework.create().withMode(CreateMode.PERSISTENT)
                .inBackground((session, event) -> {
                    System.out.println(Thread.currentThread().getName() + ":执行创建节点:" + event.getPath());
                }).forPath("/async-node");
            System.out.println("异步执行创建节点:" + node);
        }
    }
    

应用场景

  • 注册中心
    • 使用 Watcher 机制,配合 SpringBoot 的 Environment,实现 SpringBoot 启动时的配置参数注入
  • 分布式锁
    • 使用唯一节点特性实现分布式锁。如果并发量大的情况下,线程锁释放之后,大量等待线程同时竞争,引发惊群效应,影响服务器性能。不推荐使用
    • 使用有序节点实现分布式锁
      1. 每个客户端都往指定的节点下注册一个临时有序节点,越早创建的节点,节点的顺序编号就越小,那么我们可以判断子节点中最小的节点设置为获得锁。
      2. 如果自己的节点不是所有子节点中最小的,意味着还没有获得锁。然后监听编号比自己小的节点删除事件。
      3. 当比自己小的节点删除以后,客户端会收到 watcher 事件,此时再次判断自己的节点是不是所有子节点中最小的,如果是则获得锁,否则就不断重复这个过程,这样就不会导致羊群效应,因为每个客户端只需要监控一个节点 image-20220620161739566
    • 可以使用 curatorcurator 提供的 InterProcessMutex 这样一个 api。除了分布式锁之外,还提供了 leader 选举、分布式队列等常用的功能。
      • InterProcessMutex:分布式可重入排它锁
      • InterProcessSemaphoreMutex:分布式排它锁
      • InterProcessReadWriteLock:分布式读写锁
  • leader 选举
    • 在分布式计算中,leader election 是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是 leader 或者 coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务 leader。除此之外,选举还经常会发生在 leader 意外宕机的情况下,新的 leader 要被选举出来。image-20220620161911730
    • 可以通过 Curator 基于 Zookeeper 封装的 Leader 选举工具类 LeaderLatch 与 LeaderSelector 来实现

集群

  • Zookeeper 中,集群角色分为三种,分别是 Leader、Follower、Observe
    • Leader 服务器是整个 zookeeper 集群的核心,主要的工作任务有两项
      • 事物请求的唯一调度和处理者,保证集群事物处理的顺序性
      • 集群内部各服务器的调度者
    • Follower 角色,Follower 角色的主要职责是
      • 处理客户端非事物请求、转发事物请求给 leader 服务器
      • 参与事物请求 Proposal 的投票(需要半数以上服务器通过才能通知 leader commit 数据; Leader 发 起的提案,要求 Follower 投票)
      • 参与 Leader 选举的投票
    • Observer 角色,Observer 是 zookeeper3.3 开始引入的一个全新的服务器角色,从字面来理解,该角色充当了观察者的角色。
      • 观察 zookeeper 集群中的最新状态变化并将这些状态变化同步到 observer 服务器上。
      • Observer 的工作原理与 follower 角色基本一致,而它和 follower 角色唯一的不同在于 observer 不参与任何形式的投票,包括事物请求 Proposal 的投票和 leader 选举的投票。
      • **简单来说,observer 服务器只提供非事物请求服务,通常在于不影响集群事物处理能力的前提下提升集群非事物处理的能力 **
  • 通常 zookeeper 是由 2n+1 台 server 组成,每个 server 都知道彼此的存在。每个 server 都维护的内存状态镜像以及持久化存储的事务日志和快照。

    **之所以要满足 2n+1 台 server,是因为一个节点要成为集群中的 leader,需要有超过及群众过半数的节点支持,这个涉及到 leader 选举算法。同时也涉及到事务请求的提交投票 **

  • zookeeper 并不是强一致性服务,它是一个最终一致性模型:顺序一致性模型 。

    zookeeper 不保证在每个实例中,两个不同的客户端具有相同的 zookeeper 数据视图,由于网络延迟等 **因素,一个客户端可能会在另外一个客户端收到更改通知之前执行更新,考虑到 2 个客户端 A 和 B 的场景,如果 A 把 znode /a 的值从 0 设置为 1,然后告诉客户端 B 读取 /a, 则客户端 B 可能会读取到旧的值 0,具体取决于他连接到那个服务器,如果客户端 A 和 B 要读取必须要读取到相同的值,那么 client B 在读取操作之前执行 sync 方法。 zooKeeper.sync(); **

    参考官网描述ZooKeeper: Because Coordinating Distributed Systems is a Zoo (apache.org)

  • zookeeper 持久化
    • Zookeeper 的数据是持久化在磁盘上的,默认的目录是在 /tmp/zookeeper 下,这个目录中会存放事务日志和快照日志。该路径可以通过 zoo.cfg 文件来修改。
    • **在 Zab 协议中每当有接收到客户端的事务请求后 Leader 与 Follower 都会将把该事务日志存入磁盘日志文件中,该日志文件名称格式为 **log.zxid, 其中 zxid 表示当前日志文件中开始记录的第一条数据的 zxid
    • zxid,是 Zookeeper 中的数据对应的事务 ID。为了保证事务的顺序一致性,zookeeper 采用了递增的事务 id 号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了 zxid。可以 stat /node 命令查看节点的信息
    • zxid 是一个 64 位的数字,它高 32 位是 epoch,用来标识 leader 关系是否改变。低 32 位用于递增计数 。

      每次一个 leader 被选出来,它都会计算一个新的 epoch(原来的 epoch+1),标识当前属于新 leader 的统治时期。

    • epoch 标识了当前 Leader 周期,会存储到 currentEpoch 中。集群机器相互通信时,会带上这个 epoch 以确保彼此在同一个 Leader 周期中。
  • zookeeper 使用了 zab 协议,用来进行原子广播以及崩溃恢复。
  • zookeeper 数据同步(zab 协议原子广播)
    • 数据同步原理
      • 使用到了 zab 协议的原子广播,这个原子广播其实就是相当于简化版本的 2pc,半数提交

        **和完整的 2pc 事务不一样的地方在于,zab 协议不能终止事务,follower 节点要么 ACK 给 leader,要么抛弃 leader,只需要保证过半数的节点响应这个消息并提交了即可。虽然在某一个时刻 follower 节点和 leader 节点的状态会不一致,但是也是这个特性提升了集群的整体性能。 **

    • 在 zookeeper 中,客户端会随机连接到 zookeeper 集群中的一个节点。所有事务请求必须由一个全局唯一的服务器来协调处理,这个服务器就是 Leader 服务器,其他的服务器就是 follower。
    • 数据同步流程
      1. 如果是读请求,就直接从当前节点中读取数据,返回给客户端
      2. 如果是写请求,那么请求会被转发给 leader
      3. Leader 服务器把客户端的请求转化成一个事务 Proposal(提议),并把这个 Proposal 分发给集群中的所有 Follower 服务器。
      4. Follower 收到 Proposal ,本地操作,记录日志,还没有进行最终提交,然后返回消息给 Leader。
      5. Leader 服务器需要等待所有 Follower 服务器的反馈,一旦超过半数的 Follower 服务器进行了正确的反馈,那么 Leader 就会再次向所有的 Follower 服务器发送 Commit 消息,要求各个 follower 节点对前面的一个 Proposal 进行提交 。
    • 由于是半数提交,如果 Leader 服务器崩溃会带来的数据不一致问题。崩溃之后会进入到崩溃恢复模式,先选举出新的 Leader,然后重新同步数据
  • zookeeper 崩溃恢复
    • 由于使用的 zab 协议“半数提交”,当 Leader 挂掉,然后选举的过程中要考虑数据的一致性问题
      • 如果 Leader 在收到合法数量 follower 的 ACK 后,就向各个 follower 广播 COMMIT 命令,同时也会在本地执行 COMMIT 并向连接的客户端返回「成功」。但是如果在各个 follower 在收到 COMMIT 命令前 Leader 就挂了,导致剩下的服务器并没有执行都这条消息 。那么要保证最后的这个事务最终能够在所有的服务器上都能被提交成功,否则将会出现不一致
      • 当 leader 接收到消息请求生成 proposal 后就挂了,其他 follower 并没有收到此 proposal,因此经过恢复模式重新选了 leader 后,这条消息是被跳过的。此时,之前挂了的 leader 重新启动并注册成了 follower,他保留了被跳过消息的 proposal 状态,与整个系统的状态是不一致的,需要将其删除
    • Leader 选举算法需要能够确保已经被 leader 提交的事务 Proposal 能够提交、同时丢弃已经被跳过的事务 Proposal。选举算法的规则是比较 myid和zxid、epoch
    • 若进行 Leader 选举,则至少需要两台机器。
      • 集群初始化阶段,当有一台服务器 Server1 启动时,其单独无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,此时两台机器可以相互通信,每台机器都试图找到 Leader,于是进入 Leader 选举阶段,选举步骤如下
        1. 每个 Server 发出一个投票。由于是初始情况,Server1 和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的 myid 和 zxid、epoch,使用(myid, zxid,epoch)来表示
        2. 此时 Server1 的投票为(1,0,0),Server2 的投票为(2,0,0),然后各自将这个投票发给集群中其他机器。
        3. 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票(epoch)、是否来自 LOOKING 状态的服务器。
        4. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行比较。首先比较 epoch,虽然存储在 zxid 中的高 32 位,但是,选择 epoch 大的服务器作为 Leader。
        5. 如果 epoch 一样大,检查 zxid。zxid 比较大的服务器优先作为 Leader
        6. 如果 zxid 相同,那么就比较 myid。myid 较大的服务器作为 Leader 服务器。
        7. 对于 Server1 而言,它的投票是(1,0,0),接收 Server2 的投票为(2,0,0),首先会比较两者的 zxid,均为 0,再比较 myid,此时 Server2 的 myid 最大,于是更新自己的投票为(2,0,0),然后重新投票,对于 Server2 而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
        8. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了(2,0,)的投票信息,此时便认为已经选出了 Leader。
        9. 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为 FOLLOWING,如果是 Leader,就变更为 LEADING
      • 运行过程中的 leader 选举 。当集群中的 leader 服务器出现宕机或者不可用的情况时,那么整个集群将无法对外提供服务,而是进入新一轮的 Leader 选举,服务器运行期间的 Leader 选举和启动时期的 Leader 选举基本过程是一致的。
        1. 变更状态。Leader 挂后,余下的非 Observer 服务器都会将自己的服务器状态变更为 LOOKING,然后开始进入 Leader 选举过程。
        2. 每个 Server 会发出一个投票。在运行期间,每个服务器上的 ZXID 可能不同,此时假定 Server1 的 ZXID 为 123,Server3 的 ZXID 为 122;在第一轮投票中,Server1 和 Server3 都会投自己,产生投票(1,123,1),(3,122,1),然后各自将投票发送给集群中所有机器。接收来自各个服务器的投票。与启动时过程相同。
        3. 处理投票。与启动时过程相同,此时,Server1 将会成为 Leader。
        4. 统计投票。与启动时过程相同。
        5. 改变服务器的状态。与启动时过程相同
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 19 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3169 引用 • 8208 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...