java中延迟任务的处理方式

1、利用延迟队列

延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到……

应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用:

简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列,那下面就来看看延时队列demo。

一、消息体

package com.delqueue;  

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;  

/**
 * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期…… */
public class Message implements Delayed {
    private int id;
    private String body; // 消息内容
    private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。  

    public int getId() {
        return id;
    }  

    public String getBody() {
        return body;
    }  

    public long getExcuteTime() {
        return excuteTime;
    }  

    public Message(int id, String body, long delayTime) {
        this.id = id;
        this.body = body;
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }  

    // 自定义实现比较方法返回 1 0 -1三个参数
    @Override
    public int compareTo(Delayed delayed) {
        Message msg = (Message) delayed;
        return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
                : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
    }  

    // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
}  

二、消息消费者

package com.delqueue;  

import java.util.concurrent.DelayQueue;  

public class Consumer implements Runnable {
    // 延时队列 ,消费者从其中获取消息进行消费
    private DelayQueue<Message> queue;  

    public Consumer(DelayQueue<Message> queue) {
        this.queue = queue;
    }  

    @Override
    public void run() {
        while (true) {
            try {
                Message take = queue.take();
                System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}  

三、延时队列

package com.delqueue;  

import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;  

public class DelayQueueTest {
     public static void main(String[] args) {
            // 创建延时队列
            DelayQueue<Message> queue = new DelayQueue<Message>();
            // 添加延时消息,m1 延时3s
            Message m1 = new Message(1, "world", 3000);
            // 添加延时消息,m2 延时10s
            Message m2 = new Message(2, "hello", 10000);
            //将延时消息放到延时队列中
            queue.offer(m2);
            queue.offer(m1);
            // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间
            ExecutorService exec = Executors.newFixedThreadPool(1);
            exec.execute(new Consumer(queue));
            exec.shutdown();
        }
}  

将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。

这就是延迟队列demo,下面我们来说说在真实环境下的使用。

使用场景描述:

在打车软件中对订单进行派单的流程,当有订单的时候给该订单筛选司机,然后给当订单绑定司机,但是有时运气没那么好,订单进来后第一次没有筛选到合适的司机,但我们也不能就此结束派单,而是将该订单的信息放到延时队列中过个2秒钟在进行一次,其实这个2秒钟就是一个延迟,所以这里我们就可以使用延时队列来实现……

下面看看简单的流程图:

下面来看看具体代码实现:

在项目中有如下几个类:第一 、任务类   第二、按照任务类组装的消息体类  第三、延迟队列管理类

任务类即执行筛选司机、绑单、push消息的任务类

package com.test.delayqueue;
/**
 * 具体执行相关业务的业务类
 * @author whd
 * @date 2017年9月25日 上午12:49:32
 */
public class DelayOrderWorker  implements Runnable {  

    @Override
    public void run() {
        // TODO Auto-generated method stub
        //相关业务逻辑处理
        System.out.println(Thread.currentThread().getName()+" do something ……");
    }
}  

消息体类,在延时队列中这个实现了Delayed接口的消息类是比不可少的,实现接口时有一个getDelay(TimeUnit unit)方法,这个方法就是判断是否到期的

这里定义的是一个泛型类,所以可以将我们上面的任务类作为其中的task,这样就将任务类分装成了一个消息体

package com.test.delayqueue;  

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;  

/**
 * 延时队列中的消息体将任务封装为消息体
 *
 * @author whd
 * @date 2017年9月25日 上午12:48:30
 * @param <T>
 */
public class DelayOrderTask<T extends Runnable> implements Delayed {
    private final long time;
    private final T task; // 任务类,也就是之前定义的任务类  

    /**
     * @param timeout
     *            超时时间(秒)
     * @param task
     *            任务
     */
    public DelayOrderTask(long timeout, T task) {
        this.time = System.nanoTime() + timeout;
        this.task = task;
    }  

    @Override
    public int compareTo(Delayed o) {
        // TODO Auto-generated method stub
        DelayOrderTask other = (DelayOrderTask) o;
        long diff = time - other.time;
        if (diff > 0) {
            return 1;
        } else if (diff < 0) {
            return -1;
        } else {
            return 0;
        }
    }  

    @Override
    public long getDelay(TimeUnit unit) {
        // TODO Auto-generated method stub
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }  

    @Override
    public int hashCode() {
        return task.hashCode();
    }  

    public T getTask() {
        return task;
    }
}  

延时队列管理类,这个类主要就是将任务类封装成消息并并添加到延时队列中,以及轮询延时队列从中取出到时的消息体,在获取任务类放到线程池中执行任务

package com.test.delayqueue;  

import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;  

/**
 * 延时队列管理类,用来添加任务、执行任务
 *
 * @author whd
 * @date 2017年9月25日 上午12:44:59
 */
public class DelayOrderQueueManager {
    private final static int DEFAULT_THREAD_NUM = 5;
    private static int thread_num = DEFAULT_THREAD_NUM;
    // 固定大小线程池
    private ExecutorService executor;
    // 守护线程
    private Thread daemonThread;
    // 延时队列
    private DelayQueue<DelayOrderTask<?>> delayQueue;
    private static final AtomicLong atomic = new AtomicLong(0);
    private final long n = 1;
    private static DelayOrderQueueManager instance = new DelayOrderQueueManager();  

    private DelayOrderQueueManager() {
        executor = Executors.newFixedThreadPool(thread_num);
        delayQueue = new DelayQueue<>();
        init();
    }  

    public static DelayOrderQueueManager getInstance() {
        return instance;
    }  

    /**
     * 初始化
     */
    public void init() {
        daemonThread = new Thread(() -> {
            execute();
        });
        daemonThread.setName("DelayQueueMonitor");
        daemonThread.start();
    }  

    private void execute() {
        while (true) {
            Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
            System.out.println("当前存活线程数量:" + map.size());
            int taskNum = delayQueue.size();
            System.out.println("当前延时任务数量:" + taskNum);
            try {
                // 从延时队列中获取任务
                DelayOrderTask<?> delayOrderTask = delayQueue.take();
                if (delayOrderTask != null) {
                    Runnable task = delayOrderTask.getTask();
                    if (null == task) {
                        continue;
                    }
                    // 提交到线程池执行task
                    executor.execute(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }  

    /**
     * 添加任务
     *
     * @param task
     * @param time
     *            延时时间
     * @param unit
     *            时间单位
     */
    public void put(Runnable task, long time, TimeUnit unit) {
        // 获取延时时间
        long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
        // 将任务封装成实现Delayed接口的消息体
        DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
        // 将消息体放到延时队列中
        delayQueue.put(delayOrder);
    }  

    /**
     * 删除任务
     *
     * @param task
     * @return
     */
    public boolean removeTask(DelayOrderTask task) {  

        return delayQueue.remove(task);
    }
}  

测试类

package com.delqueue;  

import java.util.concurrent.TimeUnit;  

import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker;  

public class Test {
    public static void main(String[] args) {
        DelayOrderWorker work1 = new DelayOrderWorker();// 任务1
        DelayOrderWorker work2 = new DelayOrderWorker();// 任务2
        DelayOrderWorker work3 = new DelayOrderWorker();// 任务3
        // 延迟队列管理类,将任务转化消息体并将消息体放入延迟对列中等待执行
        DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
        manager.put(work1, 3000, TimeUnit.MILLISECONDS);
        manager.put(work2, 6000, TimeUnit.MILLISECONDS);
        manager.put(work3, 9000, TimeUnit.MILLISECONDS);
    }  

}  

OK 这就是项目中的具体使用情况,当然具体内容被忽略,整体框架就是这样,还有这里使用java的延时队列但是这种方式是有问题的如果如果down机则会出现任务丢失,所以也可以考虑使用mq、redis来实现

2、mq实现延迟消息

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
插件源码地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安装:

进入插件安装目录
{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)
下载插件

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下载的文件名称不规则就手动重命名一下如:

rabbitmq_delayed_message_exchange-0.0.1.ez)

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

关闭插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的,发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间

直接在maven工程的pom.xml文件中加入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Spring Boot的版本我使用的是 2.0.1.RELEASE .

接下来在 application.properties 文件中加入redis配置:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

定义ConnectionFactory和RabbitTemplate

也很简单,代码如下:

package com.mq.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
  private String host;
  private int port;
  private String userName;
  private String password;

  @Bean
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
    cachingConnectionFactory.setUsername(userName);
    cachingConnectionFactory.setPassword(password);
    cachingConnectionFactory.setVirtualHost("/");
    cachingConnectionFactory.setPublisherConfirms(true);
    return cachingConnectionFactory;
  }

  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    return rabbitTemplate;
  }

  public String getHost() {
    return host;
  }

  public void setHost(String host) {
    this.host = host;
  }

  public int getPort() {
    return port;
  }

  public void setPort(int port) {
    this.port = port;
  }

  public String getUserName() {
    return userName;
  }

  public void setUserName(String userName) {
    this.userName = userName;
  }

  public String getPassword() {
    return password;
  }

  public void setPassword(String password) {
    this.password = password;
  }
}

Exchange和Queue配置

package com.mq.rabbitmq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfig {

  @Bean
  public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);
  }

  @Bean
  public Queue queue() {
    Queue queue = new Queue("test_queue_1", true);
    return queue;
  }

  @Bean
  public Binding binding() {
    return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();
  }
}

这里要特别注意的是,使用的是 CustomExchange ,不是 DirectExchange ,另外 CustomExchange 的类型必须是 x-delayed-message 。

实现消息发送

package com.mq.rabbitmq;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

@Service
public class MessageServiceImpl {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  public void sendMsg(String queueName,String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("消息发送时间:"+sdf.format(new Date()));
    rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setHeader("x-delay",3000);
        return message;
      }
    });
  }
}

注意在发送的时候,必须加上一个header

x-delay

在这里我设置的延迟时间是3秒。

消息消费者

package com.mq.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class MessageReceiver {

  @RabbitListener(queues = "test_queue_1")
  public void receive(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("消息接收时间:"+sdf.format(new Date()));
    System.out.println("接收到的消息:"+msg);
  }
}

运行Spring Boot程序和发送消息

直接在main方法里运行Spring Boot程序,Spring Boot会自动解析 MessageReceiver 类的。

接下来只需要用Junit运行一下发送消息的接口即可。

package com.mq.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
  @Autowired
  private MessageServiceImpl messageService;
  @Test
  public void send() {
    messageService.sendMsg("test_queue_1","hello i am delay msg");
  }
}

运行完后,可以看到如下信息:

消息发送时间:2018-05-03 12:44:53
3秒钟后,Spring Boot控制台会输出:
消息接收时间:2018-05-03 12:44:56
接收到的消息:hello i am delay msg

原文地址:https://www.cnblogs.com/nxzblogs/p/11666877.html

时间: 2024-11-04 07:17:09

java中延迟任务的处理方式的相关文章

为什么Java中实现多线程的方式有两种?

在面试的过程中,我们经常问被面试者,为什么Java中实现多线程的方式有两种(一种是直接继承Thread类,一种是实现Runnable接口)?可惜的是,很多面试者都答不出来,甚至从来没有想为什么.,那么真正的原因是什么呢?我们可以用反证法推理一下: 假设Java只提供Thread供大家继承从而实现多线程,考虑下面的一个需求,如果有一个已经继承了某个父类的类,但是这个类又想实现多线程,怎么办?很显然,如果只提供一个可以继承的类,肯定解决不了这个问题.那么,如何解决,毫无疑问,就只能使用接口了.

JS 和 Java 中URL特殊字符编码方式

前几天遇到url特殊字符编码的问题,在这里整理一下: JavaScript 1.  编码 escape(String) 其中某些字符被替换成了十六进制的转义序列. 解码 unescape(String) 该函数的工作原理是这样的:通过找到形式为 %xx 和 %uxxxx 的字符序列(x 表示十六进制的数字),用 Unicode 字符 \u00xx 和 \uxxxx 替换这样的字符序列进行解码. *注释:ECMAScript v3 反对使用该方法,应用使用 decodeURI() 和 decode

Java中数组的初始化方式

Java中数组的初始化方式    初始化方式有两种: 1.静态初始化:初始化时由程序猿显式指定每一个数组元素的初始值,由系统指定数组长度 2.动态初始化:初始化时由程序猿仅仅指定数组长度,由系统为数组元素分配初始值

java中复制文本文件的方式我总结为14种(按字符读取4中,按字节读取8种!??)

java中复制文件的方式 如果按照字符来读取的话,可以有4种,基本的2种,高效的2种,高效特殊的1种 第0种: public class CopyFileDemo { public static void main(String[] args) throws Exception{ //封裝数据源 BufferedReader reader = new BufferedReader(new FileReader("a.txt")); //封装目的地 BufferedWriter writ

Java中实现线程的方式

Java中实现线程的方式 Java中实现多线程的方式的方式中最核心的就是 run()方法,不管何种方式其最终都是通过run()来运行. Java刚发布时也就是JDK 1.0版本提供了两种实现方式,一个是继承Thread类,一个是实现Runnable接口.两种方式都是去重写run()方法,在run()方法中去实现具体的业务代码. 但这两种方式有一个共同的弊端,就是由于run()方法是没有返回值的,所以通过这两方式实现的多线程读无法获得执行的结果. 为了解决这个问题在JDK 1.5的时候引入一个Ca

Java中随机数的产生方式与原理

查阅随机数相关资料,特做整理 首先说一下java中产生随机数的几种方式 在j2se中我们可以使用Math.random()方法来产生一个随机数,这个产生的随机数是0-1之间的一个double,我们可以把他乘以100,他就是个100以内的随机数字,这个在j2me中没有. 在java.util这个包里面提供了一个Random的类,我们可以新建一个Random的对象来产生随机数,他可以生产随机整数.随机float.随机double.随机long,这个也是我们在j2me的程序里经常用的一个取随机数的方法

Java中反射的实现方式

所谓反射,是指在运行时状态中,获取类中的属性和方法,以及调用其中的方法的一种机制.这种机制的作用在于获取运行时才知道的类(Class)及其中的属性(Field).方法(Method)以及调用其中的方法,也可以设置其中的属性值. 在Java中实现反射最重要的一步,也是第一步就是获取Class对象,得到Class对象后可以通过该对象调用相应的方法来获取该类中的属性.方法以及调用该类中的方法. Java中反射有如下几种实现方式: 1.通过Class.forName()方法加载字符串,就可以得到该字符串

java中int-&gt;String 3种方式效率分析

1.0 int转String方式 java中,int转String共有如下3种方式 (1) 字符串拼接(即num+"") (2) String.valueof(num) (3) Integer.toString(num) 其中,方法(2)内部直接调用了方法(3),效率相差无几 2.0 效率测试 1 int[] intArr = new int[1000000]; 2 String[] strArr1 = new String[1000000]; 3 4 Long s0 = System

java中截取字符串的方式

1.length() 字符串的长度 例:char chars[]={'a','b'.'c'}; String s=new String(chars); int len=s.length(); 2.charAt() 截取一个字符 例:char ch; ch="abc".charAt(1); 返回'b' 3.getChars() 截取多个字符 void getChars(int sourceStart,int sourceEnd,char target[],int targetStart)