Springboot整合Elastic-Job(二)

上文我们讲到Springboot整合Elastic-Job整合的demo,只是简单的实现了主要功能。本文在上文基础上,进行新的调整。

事件追踪



Elastic-Job提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。Elastic-Job目前提供了基于关系型数据库两种事件订阅方式记录事件。我们只需要将添加如下配置即可

/**
     * 将作业运行的痕迹进行持久化到DB
     */
    @Bean
    public JobEventConfiguration jobEventConfiguration(){
        return new JobEventRdbConfiguration(dataSource);
    }

项目运行后,Elastic-Job会自动创建JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引。

使用注解



上文我们添加一个任务的步骤是,定义一个任务类,再在配置类中定义任务属性,并加入到SpringJobScheduler。如果我们有几百个任务,配置类基本就无法维护了。那怎么优化呢,我们可以参考@Schedual注解,在job上定义一个注解,每次启动的时候扫描注解自动将job加入到SpringJobScheduler中。

1.抽象添加job方法

@Component
public class ElasticJobHandler {
    @Autowired
    private ZookeeperRegistryCenter regCenter;
    @Resource
    private JobEventConfiguration jobEventConfiguration;
    @Resource
    private ElasticJobListener elasticJobListener;

    /**
     * @Description 任務配置類
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                        , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }

    public void addJob(final SimpleJob simpleJob,
                       final String cron,
                       final Integer shardingTotalCount,
                       final String shardingItemParameters)
            throws IllegalAccessException, InstantiationException {
        LiteJobConfiguration jobConfig =
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters);

         new SpringJobScheduler(simpleJob, regCenter, jobConfig, jobEventConfiguration, elasticJobListener).init();
    }
}

2.添加ElasticScheduler注解

@Component
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticScheduler {
    /**
     * 任务名称
     * @return
     */
    String name();

    /**
     * cron表达式,用于控制作业触发时间
     * @return
     */
    String cron() default "";

    /**
     * 分片参数
     * @return
     */
    String shardingItemParameters() default "";

    /**
     * 总分片数
     * @return
     */
    int shardingTotalCount();

    /**
     * 任务描述信息
     * @return
     */
    String description() default "";
}

3.定义扫描方法

@Component
public class ElasticSchedulerAspect implements ApplicationContextAware, InitializingBean {
    private ApplicationContext applicationContext;
    @Autowired
    private ElasticJobHandler elasticJobHandler;
    @Override
    public void afterPropertiesSet() throws Exception {
        registrJob(applicationContext);
    }

    /**
     * 解析context信息,开始注册
     * @param applicationContext
     */
    private void registrJob(ApplicationContext applicationContext) {
        String[] beanNamesForAnnotation = applicationContext.getBeanNamesForAnnotation(ElasticScheduler.class);
        for (String beanName : beanNamesForAnnotation) {
            Class<?> handlerType = applicationContext.getType(beanName);
            Object bean = applicationContext.getBean(beanName);
            ElasticScheduler annotation = AnnotationUtils.findAnnotation(handlerType, ElasticScheduler.class);
            addJobToContext(annotation,bean);
        }
    }

    /**
     * 将任务添加到容器中
     * @param elasticScheduler
     * @param bean
     */
    private void addJobToContext(ElasticScheduler elasticScheduler, Object bean) {
        String cron = elasticScheduler.cron();
        String name = elasticScheduler.name();
        String description = elasticScheduler.description();
        String shardingItemParameters = elasticScheduler.shardingItemParameters();
        Integer shardingTotalCount = elasticScheduler.shardingTotalCount();
        try {
            elasticJobHandler.addJob((SimpleJob) bean,cron,shardingTotalCount,shardingItemParameters);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext=applicationContext;
    }

}

4.使用注解

@Component
@ElasticScheduler(cron = "0/5 * * * * ?",shardingTotalCount = 4,name = "测试注解",shardingItemParameters = "0=0,1=0,2=1,3=1")
public class StockSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " +
                        "當前分片項: %s.當前參數: %s," +
                        "當前任務名稱: %s.當前任務參數: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()

        ));
    }
}

注意,该注解只为了不想引入太多外部依赖自己随手写的,只为给大家提供思路。git上已经有人对用注解整合Elastic-Job了,大家可自行搜索。

原文地址:https://www.cnblogs.com/xmzJava/p/9839308.html

时间: 2024-11-16 18:55:25

Springboot整合Elastic-Job(二)的相关文章

【使用篇】SpringBoot整合Filter(二)

两种方式: 通过注解扫描完成 Filter 组件的注册 通过方法完成 Filter 组件的注册 一.通过注解扫描完成 Filter 组件的注册 1. 编写Filter类 /** * SpringBoot整合Filter方式一 * * 传统方式 * <filter> * <filter-name>FirstFilter</filter-name> * <filter-class>com.linhw.demo.filter.FirstFilter</fil

七、springboot整合Spring-data-jpa(二)之通用DAO接口与添加自定义方法

@NoRepositoryBean:Spring Data Jpa在启动时就不会去实例化BaseRepository这个接口 1.通用接口: import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.repository.N

springboot整合Servlet

一.整合servlet 1.通过注解扫描完成Servlet组件的注解 1.1 编写servlet /** * SpringBoot整合servlet方式一 * 以往实在web.xml配置 * <servlet> * <servlet-name>FirstServlet</servlet-name> * <servlet-class>com.demo.servlet.FirstServlet</servlet-class> * </servl

SpringBoot整合JavaWeb

一.SpringBoot整合Servlet的两种方式 1.通过注解扫描完成Servlet组件的注册 编写Servlet package com.example.demo.servlet; import java.io.IOException; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import

springboot整合mybatis,redis,代码(二)

一 说明: springboot整合mybatis,redis,代码(一) 这个开发代码的复制粘贴,可以让一些初学者直接拿过去使用,且没有什么bug 二 对上篇的说明 可以查看上图中文件: 整个工程包括配置,对对应上文的配置 原文地址:https://www.cnblogs.com/xiufengchen/p/10327501.html

springboot整合es客户端操作elasticsearch(二)

在上章节中整合elasticsearch客户端出现版本问题进行了处理,这章来进行springboot整合得操作 环境:elaticsearch6.2.1,springboot 2.1.8 客户端版本采用6.6.1 一 pom.xml依赖引入 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns

SpringBoot系列十二:SpringBoot整合 Shiro

1.概念:SpringBoot 整合 Shiro 2.具体内容 Shiro 是现在最为流行的权限认证开发框架,与它起名的只有最初的 SpringSecurity(这个开发框架非常不好用,但是千万不要 以为 SpringSecurity 没有用处,它在 SpringCloud 阶段将发挥重大的作用).但是现在如果要想整合 Shiro 开发框架有一点很遗憾, SpringBoot 没有直接的配置支持,它不像整合所谓的 Kafka.Redis.DataSource,也就是说如果要想整合 Shiro 开

九、springboot整合redis二之缓冲配置

1.创建Cache配置类 @Configuration @EnableCaching public class RedisCacheConfig extends CachingConfigurerSupport { @Value("${redis.cache.expiration}") private Long expiration; /** * * 管理缓存 */ @Bean public CacheManager cacheManager(RedisTemplate<Obje

SpringBoot 2.SpringBoot整合Mybatis

一.创建Springboot的配置文件:application.properties SpringApplication 会从 application.properties 文件中加载配置信息,下面是添加Spring配置信息的文件目录顺序: 当前目录下的/config子目录中 当前目录中 一个 classpath 包下的 /config 目录中 classpath 根目录中 大家根据自己习惯来即可. /application.properties 文件配置如下: spring.datasourc