Java 小记 — RabbitMQ 的实践与思考

前言

本篇随笔将汇总一些我对消息队列 RabbitMQ 的认识,顺便谈谈其在高并发和秒杀系统中的具体应用。

1. 预备示例

想了下,还是先抛出一个简单示例,随后再根据其具体应用场景进行扩展,我觉得这样表述条理更清晰些。

RabbitConfig:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }
}

Client:

@Component
public class Client {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCall(String content) {
        for (int i = 0; i < 10000; i++) {
            String message = i + "-" + content;
            System.out.println(String.format("Sender: %s", message));
            rabbitTemplate.convertAndSend(MQConstant.CALL, message);
        }
    }
}

Server:

@Component
public class Server {

    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
    }

}

Result:

Sender: Hello, are you there!
Receiver: reply("Hello, are you there!") Yes, I just saw your message!

以上示例会在 rabbitmq 中创建一条队列 CALL, 消息在其中等待消费:

在此基础上的简单扩展我就不再写案例了,比如领域模块完成了其核心业务规则之后可能需要更新缓存、写个邮件、记个复杂日志、做个统计报表等等,这些不需要及时反馈或者耗时的附属业务都可以通过异步队列分发,以此来提升核心业务的响应速度,同时如此处理能让领域边界更加清晰,代码的可维护性和持续拓展的能力也会有所提升。

2. 削峰

上个示例中我提到的应用场景是解耦和通知,再接着扩展,因其具备良好的缓冲性质,所以还有一个非常适合的应用场景那就是削峰。对于突如其来的极高并发请求,我们可以先瞬速地将其加入队列并回复用户一个友好提示,然后服务器可在其能承受的范围内慢慢处理,以此来防止突发的 CPU 和内存 “爆表”。

改造之后对于发送方来说当然是比较爽的,他只是将请求加入消息队列而已,处理压力都归到了消费端。接着思考,这样处理有没有副作用?如果这个请求刚好是线程阻塞的,那还要加入队列慢慢排队处理,那不是完蛋了,用户要猴年马月才能得到反馈?所以针对此,我觉得应该将消费端的方法改为异步调用(即多线程)以提升吞吐量,在 Spring Boot 中的写法也非常简单:

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
    }

}

参照示例一的方法,我发布了 10000 条消息加入队列,且消费端的调用每次阻塞一秒,那可有意思了,什么时候能处理完?但如果开几百个线程同时处理的话,那几十秒就够了,当然具体多少合适还应根据具体的业务场景和服务器配置酌情考虑。另外,别忘了配线程池:

@Configuration
public class AsyncConfig {

    @Bean
    public Executor asyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(500);
        executor.setQueueCapacity(10);

        executor.setThreadNamePrefix("MyExecutor-");

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

3. Exchange

RabbitMQ 可能为 N 个应用同时提供服务,要是你和你的蓝颜知己突然心有灵犀,在不同的业务上使用了同一个 routingKey,想想就刺激。因此,队列多了自然要进行分组管理,限定好 Exchange 的规则,接下来就可以独自玩耍了。

MQConstant:

public class MQConstant {

    public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE";

    public static final String CALL = MQConstant.EXCHANGE + ".CALL";

    public static final String ALL = MQConstant.EXCHANGE + ".#";
}

RabbitConfig:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MQConstant.EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL);
    }
}

此时我们再去查队列 CALL,可以看到已经绑定了Exchange:

当然 Exchange 的作用远不止如此,以上示例为 Topic 模式,除此之外还有 Direct、Headers 和 Fanout 模式,写法都差不多,感兴趣的童鞋可以去查看 “官方文档” 进行更深入了解。

4. 延时队列

延时任务的场景相信小伙伴们都接触过,特别是抢购的时候,在规定时间内未付款订单就被回收了。微信支付的 API 里面也有一个支付完成后的延时再确认消息推送,实现原理应该都差不多。

利用 RabbitMQ 实现该功能首先要了解他的两个特性,分别是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解个大概,一个是生存时间,一个是死信。整个过程也很容易理解,TTL 相当于一个缓冲队列,等待其过期之后消息会由 DLX 转发到实际消费队列,如此便实现了他的延时过程。

MQConstant:

public class MQConstant {

    public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE";

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";

    public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL";

    public static final String CALL = "CALL";

}

ExpirationMessagePostProcessor:

public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl;

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
                .setExpiration(ttl.toString());
        return message;
    }
}

Client:

@Component
public class Client {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCall(String content) {
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 5000;
            String message = i + "-" + content;
            System.out.println(String.format("Sender: %s", message));
            rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration));

        }
    }
}

Server:

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date());
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date));
    }

}

Result:

Sender: 1-Hello, are you there!
Sender: 2-Hello, are you there!
Sender: 3-Hello, are you there!
Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12
Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17
Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22

结果一目了然,分别在队列中延迟了 5秒,10秒,15秒,当然,以上只是我的简单示例,童鞋们可翻阅官方文档(“ ttl ” && “ dlx ”)进一步深入学习。

结语

本篇随笔不该就这么结束,但晚上心情不好,百感交集,无法继续写作,无奈至此。近期正在寻觅新的工作机会,我的微信:youclk,无论有没有推荐的,给我点鼓励,谢谢!

原文地址:https://www.cnblogs.com/youclk/p/8650100.html

时间: 2024-10-11 09:43:20

Java 小记 — RabbitMQ 的实践与思考的相关文章

RabbitMQ使用实践

RabbitMQ使用实践 参考网站: http://blog.chinaunix.net/topic/surpershi/ http://blog.csdn.net/lwkcn/article/details/25086467 http://snoopyxdy.blog.163.com/blog/static/60117440201352615631930/ 1.简介 MQ(Message Queue)消息队列,用于应用系统解耦.消息异步分发. RabbitMQ是一个在AMQP基础上完整的,可复

Java编程最差实践常见问题详细说明(1)转

Java编程最差实践常见问题详细说明(1)转 原文地址:http://www.odi.ch/prog/design/newbies.php 每天在写Java程序, 其实里面有一些细节大家可能没怎么注意, 这不, 有人总结了一个我们编程中常见的问题. 虽然一般没有什么大问题, 但是最好别这样做. 另外这里提到的很多问题其实可以通过Findbugs(http://findbugs.sourceforge.net/ )来帮我们进行检查出来. 字符串连接误用  错误的写法: Java代码   Strin

Java编程最差实践常见问题详细说明(2)转

Java编程最差实践常见问题详细说明(2)转 2012-12-13 13:57:20|  分类: JAVA |  标签:java  |举报|字号 订阅 反射使用不当  错误的写法: Java代码   Class beanClass = ... if (beanClass.newInstance() instanceof TestBean) ... 这里的本意是检查beanClass是否是TestBean或是其子类, 但是创建一个类实例可能没那么简单, 首先实例化一个对象会带来一定的消耗, 另外有

百度SDN实践与思考

编者按:2015中国SDN/NFV大会在北京召开,本次大会围绕SDN/NFV展开讨论,来自运营商.服务提供商等业界巨头纷纷参与此次大会.百度公司系统部副总监张诚发表了题为<百度SDN实践与思考>的演讲. 谢谢大家!谢谢组委会的组织!今天因为时间比较的有限,我讲的信息稍微多一些,时间上提醒我一下.今天跟大家分享的主题是"百度SDN实践与思考",更多的是在过往七八年的时间,百度在SDN的尝试,以及在工程上的一些经验. 第一,我们为什么做SDN?我们的需求是什么?这个和传统的电信

《Java 程序设计》课堂实践项目汇总链接

1.<Java 程序设计>课堂实践项目-命令行参数 2.<Java 程序设计>课堂实践项目-mini dc 3.<Java 程序设计>课堂实践项目-Arrays和String单元测试 4.<Java 程序设计>课堂实践项目-类定义

JAVA 单实例最佳实践

1.public class Singleton { private static class SingletonHolder { public static Singleton resource = new Singleton(); public static Singleton getResource() { return SingletonHolder.resource ; private Singleton(){ 2.public class Singleton { public sta

JAVA实现RabbitMQ,附安装过程

RabbitMQ的第一个JAVA实现 RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境 Erlang官网   http://www.erlang.org/ Linux 下Erlang下载选择sourcefile Wget命令下载 Ubuntu下用tar –xzvf *.tar.gz命令解压 依次执行以下命令: ./configure--prefix=/home/hadoop/mydisk/erlang (该过程可能失败,建议sudoapt-get install build

自动化接口用例从 1 到 1000 过程中的实践和思考

引言 当一个新人刚加入公司的时候,我们通常告诉新人怎么去写一个自动化用例:从工程配置到如何添加接口.如何使用断言,最后到如何将一个用例运行起来. 而在实际工作和业务场景中,我们常常面临着需要编写和组织一堆用例的情况:我们需要编写一个业务下的一系列的自动化接口用例,再把用例放到持续集成中不断运行.面临的问题比单纯让一个用例运行起来复杂的多. 本人加入公司不到一年,从写下第 1 个 case 开始,持续编写和运行了 1000 多个 case ,在这过程中有了一些思考.在本文中,和大家探论下如何编写大

RabbitMQ学习第一记:用java连接RabbitMQ

1.什么是RabbitMQ MQ(Message Queue):消息队列,是服务端设计的一个可以存储大量消息的队列,并提供客户端操作队列的方法:生产队列(向队列中添加数据).消费队列(从队列中取数据).RabbitMQ就是基于消息队列的一个典型应用.RabbitMQ除了普通的生产消费功能,还有一些高级功能:公平分发 ,轮询分发,路由模式,通配符模式,发布订阅,队列持久化. 2.java实现RabbitMQ的连接 2.1.RabbitMQ客户端jar包 <dependency><group