zookeeper项目使用几点小结

背景

前段时间学习了zookeeper后,在新的项目中刚好派上了用场,我在项目中主要负责分布式任务调度模块的开发,对我自己来说是个不小的挑战。

分布式的任务调度,技术上我们选择了zookeeper,具体的整个分布式任务调度的架构选择会另起一篇文章进行介绍。

本文主要是介绍自己在项目中zookeeper的一些扩展使用,希望可以对大家有所帮助。

项目中使用的zookeeper版本3.3.3,对应的文档地址: http://zookeeper.apache.org/doc/trunk/

扩展一:优先集群

先来点背景知识:

1.zookeeper中的server机器之间会组成leader/follower集群,1:n的关系。采用了paxos一致性算法保证了数据的一致性,就是leader/follower会采用通讯的方式进行投票来实现paxns。

2.zookeeper还支持一种observer模式,提供只读服务不参与投票,提升系统,对应文档: http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html

我们项目特性的决定了我们需要进行跨机房操作,比如杭州,美国,香港,青岛等多个机房之间进行数据交互。

跨机房之间对应的网络延迟都比较大,比如中美机房走海底光缆有ping操作200ms的延迟,杭州和青岛机房有70ms的延迟。

为了提升系统的网络性能,我们在部署zookeeper网络时会在每个机房部署节点,多个机房之间再组成一个大的网络保证数据一致性。(zookeeper千万别再搞多个集群)

最后的部署结构就会是:

  • 杭州机房  >=3台 (构建leader/follower的zk集群)
  • 青岛机房  >=1台 (构建observer的zk集群)
  • 美国机房  >=1台 (构建observer的zk集群)
  • 香港机房  >=1台 (构建observer的zk集群)

一句话概括就是: 在单个机房内组成一个投票集群,外围的机房都会是一个observer集群和投票集群进行数据交互。 这样部署的一些好处,大家可以细细体会一下

针对这样的部署结构,我们会引入一个优先集群问题: 比如在美国机房的机器需要优先去访问本机房的zk集群,访问不到后才去访问杭州机房。

默认在zookeeper3.3.3的实现中,认为所有的节点都是对等的。并没有对应的优先集群的概念,单个机器也没有对应的优先级的概念。

扩展代码:(比较暴力,采用反射的方式改变了zk client的集群列表)

  • 先使用美国机房的集群ip初始化一次zk client
  • 通过反射方式,强制在初始化后的zk client中的server列表中又加入杭州机房的机器列表
1.ZooKeeper zk = null;
2.        try {
3.            zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() {
4.
5.                public void asyncProcess(WatchedEvent event) {
6.                    //do nothing
7.                }
8.
9.            });
10.            if (serveraddrs.size() > 1) {
11.                // 强制的声明accessible
12.                ReflectionUtils.makeAccessible(clientCnxnField);
13.                ReflectionUtils.makeAccessible(serverAddrsField);
14.                // 添加第二组集群列表
15.                for (int i = 1; i < serveraddrs.size(); i++) {
16.                    String cluster = serveraddrs.get(i);
17.                    // 强制获取zk中的地址信息
18.                    ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
19.                    List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils
20.                            .getField(serverAddrsField, cnxn);
21.                    // 添加第二组集群列表
22.                    serverAddrs.addAll(buildServerAddrs(cluster));
23.                }
24.            }
25.        }

扩展二:异步Watcher处理

最早在看zookeeper的代码时,一直对它的watcher处理比较满意,使用watcher推送数据可以很方便的实现分布式锁的功能。

zookeeper的watcher实现原理也挺简单的,就是在zookeeper client和zookeeper server上都保存一份对应的watcher对象。每个zookeeper机器都会有一份完整的node tree数据和watcher数据,每次leader通知follower/observer数据发生变更后,每个zookeeper server会根据自己节点中的watcher事件推送给响应的zookeeper client,每个zk client收到后再根据内存中的watcher引用,进行回调。

这里会有个问题,就是zk client在处理watcher时,回凋的过程是一个串行的执行过程,所以单个watcher的处理慢会影响整个列表的响应。

可以看一下ClientCnxn类中的EventThread处理,该线程会定时消费一个queue的数据,挨个调用processEvent(Object event) 进行回调处理。

扩展代码:

1.public abstract class AsyncWatcher implements Watcher {
2.
3.    private static final int       DEFAULT_POOL_SIZE    = 30;
4.    private static final int       DEFAULT_ACCEPT_COUNT = 60;
5.
6.    private static ExecutorService executor             = new ThreadPoolExecutor(
7.                                                                1,
8.                                                                DEFAULT_POOL_SIZE,
9.                                                                0L,
10.                                                                TimeUnit.MILLISECONDS,
11.                                                                new ArrayBlockingQueue(
12.                                                                        DEFAULT_ACCEPT_COUNT),
13.                                                                new NamedThreadFactory(
14.                                                                        "Arbitrate-Async-Watcher"),
15.                                                                new ThreadPoolExecutor.CallerRunsPolicy());
16.
17.    public void process(final WatchedEvent event) {
18.        executor.execute(new Runnable() {//提交异步处理
19.
20.                    @Override
21.                    public void run() {
22.                        asyncProcess(event);
23.                    }
24.                });
25.
26.    }
27.
28.    public abstract void asyncProcess(WatchedEvent event);
29.
30.}

说明:

  • zookeeper针对watcher的调用是以单线程串行的方式进行处理,容易造成堵塞影响,monitor的数据同步及时性
  • AsyncWatcher为采取的一种策略为当不超过acceptCount=60的任务时,会采用异步线程的方式处理。如果超过60任务,会变为原先的单线程串行的模式

扩展三:重试处理

这个也不多说啥,看一下相关文档就清楚了

需要特殊处理下ConnectionLoss的异常,一种可恢复的异常。

重试处理:

1.public interface ZooKeeperOperation<T> {
2.
3.    public T execute() throws KeeperException, InterruptedException;
4.}
5.
6.
7./**
8.     * 包装重试策略
9.     */
10.    public <T> T retryOperation(ZooKeeperOperation<T> operation) throws KeeperException,
11.            InterruptedException {
12.        KeeperException exception = null;
13.        for (int i = 0; i < maxRetry; i++) {
14.            try {
15.                return (T) operation.execute();
16.            } catch (KeeperException.SessionExpiredException e) {
17.                logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e);
18.                throw e;
19.            } catch (KeeperException.ConnectionLossException e) { //特殊处理Connection Loss
20.                if (exception == null) {
21.                    exception = e;
22.                }
23.                logger.warn("Attempt " + i + " failed with connection loss so "
24.                        + "attempting to reconnect: " + e, e);
25.
26.                retryDelay(i);
27.            }
28.        }
29.
30.        throw exception;
31.    }

注意点:Watcher原子性

在使用zookeeper的过程中,需要特别注意一点就是注册对应watcher事件时,如果当前的节点已经满足了条件,比如exist的watcher,它不会触发你的watcher,而会等待下一次watcher条件的满足。

它的watcher是一个一次性的监听,而不是一个永久的订阅过程。所以在watcher响应和再次注册watcher过程并不是一个原子操作,编写多线程代码和锁时需要特别注意

总结

zookeepr是一个挺不错的产品,源代码写的也非常不错,大量使用了queue和异步Thread的处理模式,真是一个伟大的产品。

云栖社区站内文章,如需转载,请保留作者和出处(云栖社区),并邮件通知云栖社区([email protected])。

时间: 2024-12-30 18:47:41

zookeeper项目使用几点小结的相关文章

第一次搭建springboot+dubbo+zookeeper项目小结

第一次用IDEA搭建springboot+dubbo+zookeeper项目中碰到了一些问题,现在记录下来, 第一步:项目准备工作 工具IDEA,先下载安装zookeeper和dubbo   Zookeeper下载地址:https://zookeeper.apache.org/releases.html,我下载最新的release 3.5.7 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它是

使用javac编译zookeeper项目

这里记录zookeeper编译源代码上的一些细节的问题. 网上不少关于如何使用ant eclipse来构建zookeeper对应的eclipse工程的记录.这里就不再过多赘述.只做简单阐述. 这里主要阐述一下如何不使用任何工具直接使用javac的方式来编译zookeeper项目,并且直接使用java来运行. 一.使用ant构建eclispe工程编译法非常简单.就是在github将zookeeper项目代码下载下来.里面已经有写好的build.xml和ivy.xml及设置文档.会将所有相关依赖下载

springboot初步整合dubbo+zookeeper项目

provider 服务端 application.properties 配置 server.port=8070 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/product?characterEncoding=UTF-8 spring.datasource.username=root spring.datasource.password=654321 # 驼峰标示 mybatis.configuration.map-underscore-to

git文件夹下项目更改ip地址小结

在我们开发的过程中,经常切换项目IP地址是很正常的,之前弄过一次,没有记住,现在简单的总结下: 找到要切换IP地址的项目,点击鼠标右键,弹出下图: 打开该项目的路径后,双击打开该项目,具体参考自己项目的实际,我的结果如下: 到这步的时候,可能有的人会找不到.git这个文件,解决方法如下: 接着双击打开.git文件,找到config文件,你可以随意选择你喜欢的打开方式,本人用的是用记事本打开,如下图: 这个适合像我这种前端小白,欢迎大家点评! 作者:Plbowter 出处:http://www.c

Unreal Engine4 C++基础代码项目编译失败原因小结

在Unreal Engine4中创建一个基础代码的项目总是失败 在官网论坛里找了一天的帖子 终于找到原因 是因为目录路径的问题 错误输出: 先清空几个目录路径 可执行文件目录  包含目录 库目录 排除目录 然后全部选择 <从父级或项目默认设置继承> 应用  然后rebuild 搜索 复制

eclipse maven jdk1.8 还原站点项目红感叹号总是小结

问题背景有三 maven 默认是jdk1.5jdk1.8 目录文件夹不全操作: 在项目上右击-> build path-->config build path-->libraries->jre system libraries->t选择1.8 更新即可出现目录 解决感叹号方案 在项目上右键Properties->Project Facets,在打开的Project Facets页面中的Java下拉列表中,选择相应版本. 有可能是java1.8 widows->pr

idea+dubbo+zookeeper项目访问html页面的方法

将js,html文件放入consumer的resource的static和template文件中 1 consumer的pom需要引入模板的jar包 <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-thymeleaf</artifactId>  </dependency

Hive项目开发环境搭建(Eclipse\MyEclipse + Maven)

写在前面的话 可详细参考,一定得去看 HBase 开发环境搭建(Eclipse\MyEclipse + Maven) Zookeeper项目开发环境搭建(Eclipse\MyEclipse + Maven) 我这里,相信,能看此博客的朋友,想必是有一定基础的了.我前期写了大量的基础性博文.可以去补下基础. 步骤一:File  ->  New  -> Project   ->  Maven Project 步骤二:自行设置,待会创建的myHBase工程,放在哪个目录下. 步骤三: 步骤四:

ZooKeeper应用案例

我们通过学习借鉴,哪些项目或应用都使用了ZooKeeper,可以了解我们的应用使用ZooKeeper是否能真正地带来价值,当然,有些项目可能也未必非常适合使用ZooKeeper,我们要批判地学习.借鉴和吸收. 下面是一些使用了ZooKeeper实现的案例: HDFS HA(QJM) Hadoop 2.x之前的版本,HDFS集群中Namenode是整个集群的中央元数据存储和服务节点,它存在SPOF的问题.在2.x版本中,提出了各种HA方案,避免Namenode的SPOF问题,其中基于QJM(Quo