spring boot 定时任务基于zookeeper的分布式锁实现

基于ZooKeeper分布式锁的流程

  • 在zookeeper指定节点(locks)下创建临时顺序节点node_n
  • 获取locks下所有子节点children
  • 对子节点按节点自增序号从小到大排序
  • 判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
  • 若监听事件生效,则回到第二步重新进行判断,直到获取到锁

具体实现

添加Maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.falsh</groupId>
    <artifactId>mytiming</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.spring.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>2.0.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>spring-core</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

代码:

package com.falsh.tss.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CommonEnv {

    public static String ZK_ADDRESS;
    public static String ZK_LOCK_PATH;

    @Autowired
    public void setZkAddress(@Value("${zk.address}") String zkAddress) {
        ZK_ADDRESS = zkAddress;
    }

    @Autowired
    public void setZkLockPath(@Value("${zk.lock.path}") String zkLockPath) {
        ZK_LOCK_PATH = zkLockPath;
    }
}

package com.falsh.tss.global;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.falsh.tss.mutex.MutexLock;

@Aspect
@Component
public class ControlExecJobAspect {

    private final Logger logger = LoggerFactory.getLogger(ControlExecJobAspect.class);

    @Pointcut("execution(public * com.zhifu.tss.jobs..*(..))")
    public void jobExec() {
    }

    @Around("jobExec()")
    public void doAround(ProceedingJoinPoint pjp) throws Throwable{
        String jobClass = pjp.getTarget().getClass().getSimpleName();
        String methodName = ((MethodSignature) pjp.getSignature()).getMethod().getName();

        if (!MutexLock.isInstantiated() || !MutexLock.getInstance().isAcquiredLock()) {
            logger.info("-- None lock acquired ! {}.{} --", jobClass, methodName);
            return;
        }

        logger.info("------- start {}.{} --------", jobClass, methodName);
        long start = System.currentTimeMillis();

        try {
            pjp.proceed();
        } catch (Exception e) {
            logger.error("global erorr occured while {}.{}", jobClass, methodName, e);
        }

        long end = System.currentTimeMillis();
        logger.info("------- end {}.{}({}ms) --------", jobClass, methodName, end - start);
    }

}

package com.falsh.tss.jobs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MyJob1 {

    private final static Logger logger = LoggerFactory.getLogger(MyJob1.class);

    @Scheduled(cron="${cron.job.myJob1}")
    public void execute(){
        logger.info("我在执行定时任务1.....");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

package com.falsh.tss.mutex;

import static com.falsh.tss.config.CommonEnv.ZK_ADDRESS;
import static com.falsh.tss.config.CommonEnv.ZK_LOCK_PATH;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public class MutexLock implements InitializingBean, DisposableBean {

    private static CuratorFramework client;
    private static InterProcessLock mutexLock;
    private static volatile boolean acquiredLock;
    private static volatile MutexLock instance;

    public MutexLock() {
        client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(10,5000));
        client.start();
        mutexLock =  new InterProcessMutex(client, ZK_LOCK_PATH);
    }

    public static MutexLock getInstance(){
        if (instance == null) {
            synchronized (MutexLock.class) {
                if (instance == null) {//二次检查
                    instance = new MutexLock();
                }
            }
        }
        return instance;
    }

    public static boolean isInstantiated() {
        if (instance == null) {
            return false;
        }
        return true;
    }

    //获取锁
    public static void acquireMutexLock() throws Exception {
        mutexLock.acquire();
        acquiredLock = true;
    }

    //释放锁
    public static void releaseMutexLock() throws Exception {
        mutexLock.release();
    }

    public static boolean isAcquiredLock() {
        return acquiredLock;
    }

    @Override
    public void destroy() throws Exception {

    }

    @Override
    public void afterPropertiesSet() throws Exception {

    }
}

package com.falsh.tss;

import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import com.falsh.tss.mutex.MutexLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@EnableDubboConfiguration
@EnableAutoConfiguration
@SpringBootApplication
@ComponentScan("com.zhifu")
public class Application {

    private static final Log logger = LogFactory.getLog(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);

        try {
            MutexLock.getInstance().acquireMutexLock();
            while (true) {
                logger.warn("锁在我这");
                Thread.sleep(60000);
            }
        } catch (Exception e) {
            logger.info("获取锁失败", e);
        } finally{
            try {
                MutexLock.getInstance().releaseMutexLock();
                logger.warn("释放了锁");
            } catch (Exception e) {
                logger.error("释放锁异常", e);
            }
        }
    }
}

配置:

application-dev.properties

spring.dubbo.application.name=myjobtss
spring.dubbo.registry.address=zookeeper://192.168.x.x:2181
spring.dubbo.protocol.name=dubbo

cron.job.myJob1=0 0/1 * * * ?

zk.address=192.168.x.x:2181
zk.lock.path=/myjob-quartz-locks

原文地址:https://www.cnblogs.com/falsh/p/9829279.html

时间: 2025-01-06 19:53:14

spring boot 定时任务基于zookeeper的分布式锁实现的相关文章

基于zookeeper实现分布式锁

一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 线程锁:大家都不陌生,主要用来给方法.代码块加锁.当某个方法或者代码块使用锁时,那么在同一时刻至多仅有有一个线程在执行该段代码.当有多个线程访问同一对象的加锁方法/代码块时,同一时间只有一个线程在执行,其余线程必须要等待当前线程执行完之后才能执行该代码段.但是,其余线程是可以访问该对象中的非加锁代码块的. 进程锁:也是为了控制同一操作系统中多个进程访问一个共享资源,只是因为

基于zookeeper的分布式锁实现 【转】

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现 准备工作 有几个帮助类,先把代码放上来 ZKClient 对zk的操作做了一个简单的封装 Java代码 ZKUtil 针对zk路径的一个工具类 Java代码 NetworkUtil 获取本机IP的工具方法 Java代码 --------------------------- 正文开始  -------------------------

基于zookeeper的分布式锁实现 【转载】

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现 准备工作 有几个帮助类,先把代码放上来 ZKClient 对zk的操作做了一个简单的封装 Java代码 ZKUtil 针对zk路径的一个工具类 Java代码 NetworkUtil 获取本机IP的工具方法 Java代码 --------------------------- 正文开始  -------------------------

基于zookeeper的分布式锁实现【转】

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现 准备工作 有几个帮助类,先把代码放上来 ZKClient 对zk的操作做了一个简单的封装 Java代码 ZKUtil 针对zk路径的一个工具类 Java代码 NetworkUtil 获取本机IP的工具方法 Java代码 --------------------------- 正文开始  -------------------------

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

基于zookeeper的分布式锁实现

需要了解源码的朋友加我QQ:2137028325 框架简介: 本系统一款通用的SOA中间件平台,用来开发各类J2EE企业级应用,节省时间和人力成本.本系统采用MVC模式.AOP引擎.任务调度器.Ajax.拦截器.过滤器.缓存.日志监控.数据访问.表达式.国际化等技术. 框架/平台构成:Maven+Springmvc + Mybatis + Shiro(权限)+ Tiles(模板) +ActiveMQ(消息队列) + Rest(服务) + WebService(服务)+ EHcache(缓存) +

基于redis和zookeeper的分布式锁实现方式

先来说说什么是分布式锁,简单来说,分布式锁就是在分布式并发场景中,能够实现多节点的代码同步的一种机制.从实现角度来看,主要有两种方式:基于redis的方式和基于zookeeper的方式,下面分别简单介绍下这两种方式: 一.基于redis的分布式锁实现 1.获取锁 redis是一种key-value形式的NOSQL数据库,常用于作服务器的缓存.从redis v2.6.12开始,set命令开始变成如下格式: SET key value [EX seconds] [PX milliseconds] [

基于redis的分布式锁实现

关于分布式锁 很久之前有讲过并发编程中的锁并发编程的锁机制:synchronized和lock.在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量.而同步的本质是通过锁来实现的.为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记.

基于ZooKeeper的分布式Session实现(转)

1.   认识ZooKeeper ZooKeeper—— “动物园管理员”.动物园里当然有好多的动物,游客可以根据动物园提供的向导图到不同的场馆观赏各种类型的动物,而不是像走在原始丛林里,心惊胆颤的被动 物所观赏.为了让各种不同的动物呆在它们应该呆的地方,而不是相互串门,或是相互厮杀,就需要动物园管理员按照动物的各种习性加以分类和管理,这样我们才 能更加放心安全的观赏动物.回到我们企业级应用系统中,随着信息化水平的不断提高,我们的企业级系统变得越来越庞大臃肿,性能急剧下降,客户抱怨频频.拆 分系