springboot2整合zookeeper集成curator

步骤:

1- pom.xml

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

2- yml配置:

zk:
  url: 127.0.0.1:2181
  localPath: /newlock
  timeout: 3000

3- 配置类

package com.test.domi.config;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZookeeperConf {

    @Value("${zk.url}")
    private String zkUrl;

    @Bean
    public CuratorFramework getCuratorFramework(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl,retryPolicy);
        client.start();
        return client;
    }
}

4- 使用

package com.test.domi.common.utils.lock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Component("zklock")
public class ZKlock implements Lock {

    @Autowired
    private CuratorFramework zkClient;
    @Value("${zk.localPath}")
    private String lockPath;
    private String currentPath;
    private String beforePath;

    @Override
    public boolean tryLock() {
        try {
            //根节点的初始化放在构造函数里面不生效
            if (zkClient.checkExists().forPath(lockPath) == null) {
                System.out.println("初始化根节点==========>" + lockPath);
                zkClient.create().creatingParentsIfNeeded().forPath(lockPath);
            }
            System.out.println("当前线程" + Thread.currentThread().getName() + "初始化根节点" + lockPath);
        } catch (Exception e) {
        }

        if (currentPath == null) {
            try {
                currentPath = this.zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .forPath(lockPath + "/");
            } catch (Exception e) {
                return false;
            }
        }
        try {
            //此处该如何获取所有的临时节点呢?如locks00004.而不是获取/locks/order中的order作为子节点??
            List<String> childrens = this.zkClient.getChildren().forPath(lockPath);
            Collections.sort(childrens);
            if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
                System.out.println("当前线程获得锁" + currentPath);
                return true;
            }else{
               //取前一个节点
                int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
                //如果是-1表示children里面没有该节点
                beforePath = lockPath + "/" + childrens.get(curIndex - 1);
            }
        } catch (Exception e) {
            return false;
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            waiForLock();
            lock();
        }
    }

    @Override
    public void unlock() {
        try {
            zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(currentPath);
        } catch (Exception e) {
            //guaranteed()保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
        }
    }

    private void waiForLock(){
        CountDownLatch cdl = new CountDownLatch(1);
        //创建监听器watch
          NodeCache nodeCache = new NodeCache(zkClient,beforePath);
        try {
            nodeCache.start(true);
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    cdl.countDown();
                    System.out.println(beforePath + "节点监听事件触发,重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));
                }
            });
        } catch (Exception e) {
        }
        //如果前一个节点还存在,则阻塞自己
        try {
            if (zkClient.checkExists().forPath(beforePath) == null) {
                cdl.await();
            }
        } catch (Exception e) {
        }finally {
            //阻塞结束,说明自己是最小的节点,则取消watch,开始获取锁
            try {
                nodeCache.close();
            } catch (IOException e) {
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }

}

5- 调用demo

package com.test.domi.controller;

import com.test.domi.common.utils.ZkUtil;
import com.test.domi.common.utils.lock.ZKlock;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/zk")
public class ZKController {

    @Autowired
    private CuratorFramework zkClient;
//    @Autowired
//    private ZkClient zkClient;

    private String url = "127.0.0.1:2181";
    private int timeout = 3000;
    private String lockPath = "/testl";
    @Autowired
    private ZKlock zklock;
    private int k = 1;

    @GetMapping("/lock")
    public Boolean getLock() throws Exception{

        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                  zklock.lock();

          zklock.unlock();
} }).start(); } return true; } }

原文地址:https://www.cnblogs.com/domi22/p/9748083.html

时间: 2024-11-13 14:59:13

springboot2整合zookeeper集成curator的相关文章

SpringBoot2 整合 Zookeeper组件,管理架构中服务协调

本文源码:GitHub·点这里 || GitEE·点这里 一.Zookeeper基础简介 1.概念简介 Zookeeper是一个Apache开源的分布式的应用,为系统架构提供协调服务.从设计模式角度来审视:该组件是一个基于观察者模式设计的框架,负责存储和管理数据,接受观察者的注册,一旦数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式.ZooKeeper的目标就是封装好复杂易出错的关键服务,将

SpringBoot2 整合Kafka组件,应用案例和流程详解

本文源码:GitHub·点这里 || GitEE·点这里 一.搭建Kafka环境 1.下载解压 -- 下载 wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz -- 解压 tar -zxvf kafka_2.11-2.2.0.tgz -- 重命名 mv kafka_2.11-2.2.0 kafka2.11 2.启动Kafka服务 kafka依赖ZooKeeper服务,需要本地安装并启动ZooKeeper. 参

SpringBoot整合Shiro 集成Redis缓存(六)

简介:由于考虑到项目后期分布式部署,所以缓存由ehcache改为redis,而redis既有单机版部署,也有分布式部署,所以二者需要兼容. 1. maven依赖 <dependency> <groupId>org.crazycake</groupId> <artifactId>shiro-redis</artifactId> <version>3.1.0</version> </dependency> 2. 设

SpringBoot2 整合 Swagger2

SpringBoot2 整合 Swagger2 SpringBoot整合三板斧 第一步.引入pom <dependency> <groupId>com.spring4all</groupId> <artifactId>swagger-spring-boot-starter</artifactId> <version>1.9.0.RELEASE</version> </dependency> <depend

springboot整合logback集成elk实现日志的汇总、分析、统计和检索功能

在Spring Boot当中,默认使用logback进行log操作.logback支持将日志数据通过提供IP地址.端口号,以Socket的方式远程发送.在Spring Boot中,通常使用logback-spring.xml来进行logback配置. 首先.创建一个elk的springboot项目,然后先对logback进行配置,配置各项的详细说明可以去看http://aub.iteye.com/blog/1101222,说的很详细.也多参考一下别人关于日志的描述https://www.cnbl

Spring Cloud ZooKeeper集成Feign的坑2,服务调用了一次后第二次调用就变成了500,错误:Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.n

错误如下: 2017-09-19 15:05:24.659 INFO 9986 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.spring[email protected]56528192: startup date [Tue Sep 19 15:05:24 CST 2017]; root of context hierarchy 2017-09-19 15:05:24.858 INFO 9986 --

ZooKeeper和Curator相关经验总结

一.关于ZooKeeper的watch用法,需要注意 详细说明如下: ZooKeeper Watches All of the read operations in ZooKeeper - getData(), getChildren(), and exists() - have the option of setting a watch as a side effect. Here is ZooKeeper's definition of a watch: a watch event is o

zookeeper使用curator进行选主(Leader)

在分布式系统设计中,选主是一个常见的场景.选主是一个这样的过程,通过选主,主节点被选择出来控制其他节点或者是分配任务. 选主算法要满足的几个特征: 1)各个节点均衡的获得成为主节点的权利,一旦主节点被选出,其他的节点可以感知到谁是主节点,被服从分配. 2)主节点是唯一存在的 3)一旦主节点失效,宕机或者断开连接,其他的节点能够感知,并且重新进行选主算法. zookeeper实现了安全可靠的选主机制. 作为zookeeper的高级api封装库curator选主算法主要有以下两个:Leader La

ZooKeeper与Curator注册和监控

Curator提供了对zookeeper客户端的封装,并监控连接状态和会话session,特别是会话session过期后,curator能够重新连接zookeeper,并且创建一个新的session. 对于zk的使用者来说,session的概念至关重要,如果想了解更多session的说明,请访问:http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html zk客户端和zk服务器间主要可能存在下面几种异常情况: 1.     短暂