Spring-batch学习总结(2)—Job,Flow创建及应用,多线程并发,决策器,监听器,参数

一.Job的创建及其应用
1.Job flow的介绍:
(1)状态机:例完成step1,是否继续完成step2,step3,我们就需要通过Job flow来控制
(2)进行演示:使用next()方法来达到顺序执行step1,step2...的目的,再使用on(),to(),from()方法达到与next()方法同样的目的,再展示fail()方法和stopAndRestart()方法;
例1:创建JobFlowDemoOne,以及三个Step使用next()让其顺序执行
JobFlowDemOne:

package com.dhcc.batch.batchDemo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobFlowDemOne {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job JobFlowDemo1() {
        return jobBuilderFactory.get("JobFlowDemo1").start(step1()).next(step2()).next(step3()).build();

    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("step1-->Hello Spring Batch....");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2").tasklet((contribution, context) -> {
            System.out.println("step 2-->Hello Spring Batch..");
            return RepeatStatus.FINISHED;
        }).build();

    }

    @Bean
    public Step step3() {
        return stepBuilderFactory.get("step3").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("step3-->Hello Spring Batch....");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }
}

在step2()中,我们使用了java8的新特性,使用了Lambda表达式

package com.dhcc.batch.batchDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class BatchDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchDemoApplication.class, args);
    }
}

开始执行,执行结果为:

观察控制台执行结果,我们可以发现Step1,step2,step3,按照next()的顺序顺序执行完毕,此时我们在进入数据库中进行观察,我们主要观察JobExecution表以及StepExecution表,查看其表数据:
JobExecution:

StepExecution:

在查看一个JobInstance,观察Job实例:

例2:我们使用on(),to(),from()方法顺序执行我们的Step,达到与例1相同的效果
原代码不变,我们将JobFlowDemo1略作修改(注:我在这里为例方便演示新建了一个数据库):

@Bean
    public Job JobFlowDemo2() {
        return jobBuilderFactory.get("JobFlowDemo2")
//              .start(step1())
//              .next(step2())
//              .next(step3())
                .start(step1()).on("COMPLETED").to(step2())
                .from(step2()).on("COMPLETED").to(step3())
                .from(step3()).end()
                .build();

观察控制台:

代码执行成功;
fail()方法和stopAndRestart()方法后面用到的时候我们在做详细了解

二.Flow的创建及使用
1.flow的介绍:
(1)flow是一个Step的集合,他规定了Step与Step之间的转换关系;
(2)创建Flow可以达到复用的效果,让其在不同的Job之间进行复用;
(3)使用FlowBuilder去创建一个Flow,他和Job类似,使用start(),next()以及end()来运行flow;
例:
创建JobFlowDemoTwoApplication

package com.dhcc.batch.batchDemo.jobFlowDemoTwo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class JobFlowDemoTwoApplication {

    public static void main(String[] args) {
        SpringApplication.run(JobFlowDemoTwoApplication.class, args);
    }
}

创建step,flow以及Job——》JobFlowDemoTwoConfiguration:

package com.dhcc.batch.batchDemo.jobFlowDemoTwo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobFlowDemoTwoConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step jobFlowDemoTwo1() {
        return stepBuilderFactory.get("jobFlowDemoTwo1").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello-->jobFlowDemoTwo1");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    @Bean
    public Step jobFlowDemoTwo2() {
        return stepBuilderFactory.get("jobFlowDemoTwo2").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello-->jobFlowDemoTwo2");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    @Bean
    public Step jobFlowDemoTwo3() {
        return stepBuilderFactory.get("jobFlowDemoTwo3").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello-->jobFlowDemoTwo3");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    // 创建Flow,它是一个Step集合
    @Bean
    public Flow jobFlowDemoFlow() {
        return new FlowBuilder<Flow>("jobFlowDemoFlow")
                .start(jobFlowDemoTwo1())
                .next(jobFlowDemoTwo2())
                .build();

    }

    //创建一个Job执行Flow以及Step
    @Bean
    public Job jobFlowDemoTwoJob() {
        return  jobBuilderFactory.get("jobFlowDemoTwoJob")
                .start(jobFlowDemoFlow())
                .next(jobFlowDemoTwo3())
                .end()
                .build();

    }

}

开始运行,我们观察控制台:

运行结果我们可以看见Job先运行了Flow中的两个Step,然后再运行jobFlowDemoStep3(),与我们设定的顺序一样
在数据库中,我们以stepExection表为例观察:

看见了3个Step顺序执行完成;
三.Spilt多线程并发任务定义及使用
1.Spilt异步执行Flow
2.举例说明:
(1)首先定义两个Flow,在每个Flow中定义一些Step,每一个Step将自身的名字以及当前运行的线程打印出来;
(2)创建一个Job使用Spilt异步的启动两个Flow;
(3)运行Job,查看结果;
例:
JobSpiltDemoApplication:

package com.dhcc.batch.batchDemo.jobSpiltDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class JobSpiltDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(JobSpiltDemoApplication.class, args);
    }
}

JobSpiltDemoConfiguration:

package com.dhcc.batch.batchDemo.jobSpiltDemo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
public class JobSpiltDemoConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    /**
     * 创建job运行Flow,我们利用split(new
     * SimpleAsyncTaskExecutor()).add()让flow异步执行,add()中可以添加多个Flow
     *
     * @return
     */
    @Bean
    public Job SpiltJob() {
        return jobBuilderFactory.get("SpiltJob").start(jobSpiltFlow1()).split(new SimpleAsyncTaskExecutor())
                .add(jobSpiltFlow2()).end().build();

    }

    // 创建Flow1
    @Bean
    public Flow jobSpiltFlow1() {
        return new FlowBuilder<Flow>("jobSpiltFlow1")
                .start(stepBuilderFactory.get("jobSpiltStep1").tasklet(tasklet()).build())
                .next(stepBuilderFactory.get("jobSpiltStep2").tasklet(tasklet()).build()).build();

    }

    // 创建Flow1
    @Bean
    public Flow jobSpiltFlow2() {
        return new FlowBuilder<Flow>("jobSpiltFlow2")
                .start(stepBuilderFactory.get("jobSpiltStep3").tasklet(tasklet()).build())
                .next(stepBuilderFactory.get("jobSpiltStep4").tasklet(tasklet()).build()).build();

    }

    private Tasklet tasklet() {
        return new PrintTasklet();
    }

    // step执行的任务类(可以写为外部类,此处为了方便,我们写为内部类)
    private class PrintTasklet implements Tasklet {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            System.out.println("has been execute on stepName:" + chunkContext.getStepContext().getStepName()
                    + ",has been execute on thread:" + Thread.currentThread().getName());
            return RepeatStatus.FINISHED;
        }

    }

}

运行结果:


观察控制台,我们可以看出我们达到了让Step异步执行的目的
观察数据库表:

四.决策器的定义及应用
1.Decision:为我们提供下一步执行哪一个Step提供条件决策
2.JobExecutionDecider:接口,提供决策条件
例:

package com.dhcc.batch.batchDemo.flowDecisionDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class FlowDecisionDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(FlowDecisionDemoApplication.class, args);
    }
}


FlowDecisionDemoConfiguration:

package com.dhcc.batch.batchDemo.flowDecisionDemo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlowDecisionDemoConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    //编写Job
    @Bean
    public Job FlowDecisionJob() {
        return jobBuilderFactory.get("FlowDecisionJob")
                .start(firstStep()).next(myDecider())
                .from(myDecider()).on("EVEN").to(evenStep())
                .from(myDecider()).on("ODD").to(oddStep())
                .from(oddStep()).on("*").to(myDecider())
                .end()
                .build();

    }
    @Bean
    public Step firstStep() {
        return stepBuilderFactory.get("firstStep").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello firstStep..");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    @Bean
    public Step evenStep() {
        return stepBuilderFactory.get("evenStep").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello evenStep..");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    @Bean
    public Step oddStep() {
        return stepBuilderFactory.get("oddStep").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello oddStep..");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    //编写决策器
    @Bean
    public JobExecutionDecider myDecider() {
        return new myDecider();

    }

}

myDecider:

package com.dhcc.batch.batchDemo.flowDecisionDemo;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;

public class myDecider implements JobExecutionDecider {
    private int count=0;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        count++;
        if(count%2==0) {
            return new FlowExecutionStatus("EVEN");
        }else {
            return new FlowExecutionStatus("ODD");
        }
    }

}

运行结果:

我们观察控制台,分析结果:首先job中先运行firstStep(),然后进入到myDecider中Count++,此时count=1,返回”ODD”,job中执行oddStep(),然后无论什么状态再次进入myDecider中,此时count=2,故返回”EVEN”,下一步执行evenStep();
五.Job的嵌套定义及应用
1.job可以嵌套使用,嵌套的Job我们将其称为子job,被嵌套的Job我们将其称为父job;
2.一个父Job可以有多个子Job;
3.子job不能单独运行,需要其父Job去启动;
例:
NestedJobApplication:

package com.dhcc.batch.batchDemo.nestedJob;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class NestedJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(NestedJobApplication.class, args);
    }
}

接下来建立两个子Job,每个子job中有两个Step,一个父job
让父job控制运行子job

ChildJobOneConfiguration:

package com.dhcc.batch.batchDemo.nestedJob;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ChildJobOneConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job childJob1() {
        return jobBuilderFactory.get("childJob1")
                .start(childJob1Step1())
                .next(childJob1Step2())
                .build();

    }

    @Bean
    public Step childJob1Step1() {
        return stepBuilderFactory.get("childJob1Step1").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello---->childJob1Step1");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    @Bean
    public Step childJob1Step2() {
        return stepBuilderFactory.get("childJob1Step2").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello---->childJob1Step2");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

}

ChildJobTwoConfiguration:

package com.dhcc.batch.batchDemo.nestedJob;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ChildJobTwoConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job childJob2() {
        return jobBuilderFactory.get("childJob2")
                .start(childJob2Step1())
                .next(childJob2Step2())
                .build();

    }

    @Bean
    public Step childJob2Step1() {
        return stepBuilderFactory.get("childJob2Step1").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello---->childJob2Step1");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

    @Bean
    public Step childJob2Step2() {
        return stepBuilderFactory.get("childJob2Step2").tasklet(new Tasklet() {

            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello---->childJob2Step2");
                return RepeatStatus.FINISHED;
            }
        }).build();

    }

}

ParentJobConfiguration:

package com.dhcc.batch.batchDemo.nestedJob;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class ParentJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private Job childJob1;
    @Autowired
    private Job childJob2;
    @Autowired
    private JobLauncher jobLauncher;

    @Bean
    public Job parentJob(JobRepository repository,PlatformTransactionManager transactionManager) {
        return jobBuilderFactory.get("parentJob")
                .start(parentJobStep())
                .next(childJob1(repository, transactionManager))
                .next(childJob2(repository, transactionManager))
                .build();
    }

    private Step childJob1(JobRepository repository,PlatformTransactionManager transactionManager) {
        return new JobStepBuilder(new StepBuilder("childJob1"))
                .job(childJob1)
                .launcher(jobLauncher)
                .repository(repository)
                .transactionManager(transactionManager)
                .build();
    }

    private Step childJob2(JobRepository repository,PlatformTransactionManager transactionManager) {
        return new JobStepBuilder(new StepBuilder("childJob2"))
                .job(childJob2)
                .launcher(jobLauncher)
                .repository(repository)
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Step parentJobStep() {
        return stepBuilderFactory.get("parentJobStep")
                .tasklet(new Tasklet() {

                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("Helllo------>parentJobStep..");
                        return RepeatStatus.FINISHED;
                    }
                }).build();

    }

}

在配置文件中添加

spring.datasource.url=jdbc:mysql://localhost:3306/springbatch?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
spring.datasource.username=root
spring.datasource.password=qitao1996
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql

spring.batch.initialize-schema=always

spring.batch.job.names=parentJob

运行结果:

观察控制台,成功运行,我们达到了job嵌套的效果
六.监听器的定义及应用
1.Listener:控制Job执行的一种方式
2.可以通过接口或者注解实现监听器
3.在spring-batch中提供各个级别的监听器接口,从job级别到item级别都有
(1)JobExecutionListener(before..,after..);
(2)StepExecutionListener(before..,after..);
(3)ChunkListener(before..,after..);
(4)ItemReaderListener;ItemWriterListener;ItemProcessListener(before..,after..,error..);
例:
ListenerJobApplication :

package com.dhcc.batch.batchDemo.listenerJob;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class ListenerJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(ListenerJobApplication.class, args);
    }
}

创建监听器(通过实现接口实现)
MyJobListener:

package com.dhcc.batch.batchDemo.listenerJob;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

/**
 * 实现接口构建监听器
 * @author Administrator
 *
 */
public class MyJobListener implements JobExecutionListener{

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobInstance().getJobName()+"before running......");
    }

}

创建监听器(通过注解实现)
MyChunkListener:

package com.dhcc.batch.batchDemo.listenerJob;

import org.springframework.batch.core.annotation.AfterChunk;
import org.springframework.batch.core.annotation.BeforeChunk;
import org.springframework.batch.core.scope.context.ChunkContext;
/**
 * 使用注解构建监听器
 * @author Administrator
 *
 */
public class MyChunkListener {

    @BeforeChunk
    public void beforeChunk(ChunkContext context) {
        System.out.println(context.getStepContext().getStepName()+"chunk before running.....");
    }

    @AfterChunk
    public void afterChunk(ChunkContext context) {
        System.out.println(context.getStepContext().getStepName()+"chunk after running.....");
    }

}

MyListenerJobConfiguration:

package com.dhcc.batch.batchDemo.listenerJob;

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyListenerJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    //监听Job执行
    @Bean
    public Job myListenerJob() {
        return jobBuilderFactory.get("myListenerJob")
                .start(myListenerStep())
                .listener(new MyJobListener())
                .build();

    }

    private Step myListenerStep() {
        return stepBuilderFactory.get("myListenerStep")
                .<String,String>chunk(2)
                .faultTolerant()
                .listener(new MyChunkListener())
                .reader(reader())
                .writer(writer())
                .build();
    }
    private ItemReader<? extends String> reader() {
        return new ListItemReader<>(Arrays.asList("maozedong","zhude","pendehuai","zhouenlai","liushaoqi"));
    }

    private ItemWriter<? super String> writer() {
        return new ItemWriter<String>() {

            @Override
            public void write(List<? extends String> items) throws Exception {
                for(String item:items) {
                    System.out.println("Writing item: "+item);
                }

            }
        };
    }

}

运行结果:

观察控制台,我们成功的完成了监听任务..
七.Job参数初探
1.JobParameters作用:在Job运行过程中,可以用来传递信息
2.通过”key---->value”键值对的形式传入,在代码中我们通过get(“key”)来获取value值
例:
JobParametersDemoApplication:

package com.dhcc.batch.batchDemo.jobParametersDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class JobParametersDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(JobParametersDemoApplication.class, args);
    }
}

创建Job,Step:
JobParametersConfiguretion:

package com.dhcc.batch.batchDemo.jobParametersDemo;

import java.util.Map;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JobParametersConfiguretion implements StepExecutionListener{

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    private Map<String, JobParameter> parames;

    @Bean
    public Job MyParametersJobThree() {
        return jobBuilderFactory.get("MyParametersJobThree")
                .start(MyParametersJobStep3())
                .build();
    }

    private Step MyParametersJobStep3() {

        return stepBuilderFactory.get("MyParametersJobStep3")
                .listener(this)
                .tasklet(new Tasklet() {

                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("parame is: "+parames.get("info"));
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName()+"运行之前...........");
        parames = stepExecution.getJobParameters().getParameters();
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName()+"运行之完毕...........");
        return null;
    }
}

运行结果:

观察控制台,此处我们没有进行传参;

原文地址:http://blog.51cto.com/13501268/2177746

时间: 2024-10-03 22:55:33

Spring-batch学习总结(2)—Job,Flow创建及应用,多线程并发,决策器,监听器,参数的相关文章

Spring Batch学习笔记三:JobRepository

此系列博客皆为学习Spring Batch时的一些笔记: Spring Batch Job在运行时有很多元数据,这些元数据一般会被保存在内存或者数据库中,由于Spring Batch在默认配置是使用HSQLDB,也就是说在Job的运行过程中,所有的元数据都被储存在内存中,在Job结束后会随着进程的结束自动消失:在这里我们推荐配置JobRepository去使用MySQL. 在这种情况下,Spring Batch在单次执行或者从一个执行到另外一个执行的时候会使用数据库去维护状态,Job执行的信息包

Spring Batch学习笔记二

此系列博客皆为学习Spring Batch时的一些笔记: Spring Batch的架构 一个Batch Job是指一系列有序的Step的集合,它们作为预定义流程的一部分而被执行: Step代表一个自定义的工作单元,它是Job的主要构件块:每一个Step由三部分组成:ItemReader.ItemProcessor.ItemWriter:这三个部分将执行在每一条被处理的记录上,ItemReader读取每一条记录,然后传递给ItemProcessor处理,最后交给ItemWriter做持久化:It

Spring Batch学习(三)JobRepository

Spring Batch Job在运行时有很多元数据,这些元数据一般会被保存在内存或者数据库中,由于Spring Batch在默认配置是使用HSQLDB,也就是说在Job的运行过程中,所有的元数据都被储存在内存中,在Job结束后会随着进程的结束自动消失:在这里我们推荐配置JobRepository去使用MySQL. 在这种情况下,Spring Batch在单次执行或者从一个执行到另外一个执行的时候会使用数据库去维护状态,Job执行的信息包括Job实例.传入的参数.执行的结果.每一个Step执行的

Spring Batch学习笔记&mdash;&mdash;steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来.努力使steps独立.如果你实在是不能独立他们,才使用下面的技术.你应该把数据共享作为steps不能独立的后备方案. 1 数据共享方式: a step存储共享数据到数据库,receiving step从数据库读取他们 Execution context(执行上下文): 使用Spring  Batch execution context 作为data容器.a step往上下文写数

Spring Batch学习_ItemReaders and ItemWriters

All batch processing can be described in its most simple form as reading in large amounts of data, performing some type of calculation or transformation, and writing the result out. Spring Batch provides three key interfaces to help perform bulk read

Spring Batch学习

今天准备研究下Spring Batch,然后看了一系列资料,如下还是比较好的教程吧. 链接: http://www.cnblogs.com/gulvzhe/archive/2011/12/20/2295090.html 但在进行到 Spring Batch 之 Sample(CSV文件操作)(四) 时,发现了实战中的问题: outputFile.csv死活写不进结果!!! 想来想去,也尝试了n种调试,确定reader和process绝对没问题,那就writer出现问题了. 所以肯定是配置的csv

Spring Batch学习(一)介绍

为什么我们需要批处理? 我们不会总是想要立即得到需要的信息,批处理允许我们在请求处理之前就一个既定的流程开始搜集信息:比如说一个银行对账单,我们可以按月生成,并在用户查询之前开启一个批处理流程进行处理: 有时候它能让生意做得更好:比如说在线购物时,并不是说你买了一个产品零售商就立即发货,而是四五个小时后,统一发货: 更好的利用资源:让应该利用的处理能力闲置起来是一个大的浪费,我们可以定制处理让一个机器一个接一个的运行Job可以更好的利用机器的处理能力: 什么是批处理? 批处理是指在没有与用户进行

Spring Batch学习(二)架构

Spring Batch的架构 一个Batch Job是指一系列有序的Step的集合,它们作为预定义流程的一部分而被执行: Step代表一个自定义的工作单元,它是Job的主要构件块:每一个Step由三部分组成:ItemReader.ItemProcessor.ItemWriter:这三个部分将执行在每一条被处理的记录上,ItemReader读取每一条记录,然后传递给ItemProcessor处理,最后交给ItemWriter做持久化:ItemProcessor不是必须的,一个Step可以仅仅包含

Spring batch学习 持久化表结构详解(2)

#接上一篇 这一篇讲一下持久化需要表 batch_job_execution, batch_job_execution_context, batch_job_execution_params, batch_job_execution_seq, batch_job_instance, batch_job_seq, batch_step_execution, batch_step_execution_context, batch_step_execution_seq _seq结尾的三张表,维护bat

Spring.Net学习笔记三(对象的创建)

本文接下来要阐述的是IOC对象怎么创建,主要有3种方法: 第一种:构造器创建(嵌套类型对象的创建需要用“+”号来连接被嵌套的类型.如果在PersonDao中嵌套了类型Person) 类库代码: 1 namespace Dao 2 { 3 public interface IPersonDao 4 { 5 void Save(); 6 } 7 public class PersonDao : IPersonDao 8 { 9 public void Save() 10 { 11 Console.W