RabbitMQ消费端自定义监听(九)

  场景

    我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理。

  实际环境

    我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用。

  操作:  

        //生产端代码
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_consumer_exchange";
        String routingKey = "consumer.save";

        String msg = "Hello RabbitMQ Consumer Message";

        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        //消费端代码
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //使用自定义consumer
        channel.basicConsume(queueName, true, new MyConsumer(channel));    
       //自定义消费端
        //继承DefaultConsumer类
        public class MyConsumer extends DefaultConsumer {

               public MyConsumer(Channel channel) {
                       super(channel);
               }

               //重写handleDelivery()
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.err.println("-----------consume message----------");
                      System.err.println("consumerTag: " + consumerTag);
                      System.err.println("envelope: " + envelope);
                      System.err.println("properties: " + properties);
                      System.err.println("body: " + new String(body));
               }

          }    

    运行结果:

    

原文地址:https://www.cnblogs.com/luhan777/p/11193004.html

时间: 2024-10-31 03:04:01

RabbitMQ消费端自定义监听(九)的相关文章

RabbitMQ消费端自定义监听器DefaultConsumer

消费者 package com.flying.rabbitmq.api.consumer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 自定义消费者类型 */ public class Consumer { public static void main(String[] args) th

linux epoll机制对TCP 客户端和服务端的监听C代码通用框架实现

1 TCP简介 tcp是一种基于流的应用层协议,其"可靠的数据传输"实现的原理就是,"拥塞控制"的滑动窗口机制,该机制包含的算法主要有"慢启动","拥塞避免","快速重传". 2 TCP socket建立和epoll监听实现 数据结构设计 linux环境下,应用层TCP消息体定义如下: typedef struct TcpMsg_s { TcpMsgHeader head; void* msg; }TcpM

RabbitMQ confirm的确认监听模式

添加确认监听需要开启确认监听模式 实现 addConfirmListener方法confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息: 消费者: package com.flying.rabbitmq.api.confirm; i

JS移动端如何监听软键盘回车事件

移动端经常项目中会有搜索之类的功能,一般实现的是按搜索按钮进行搜索,如果要像PC端一样实现按回车键进行搜索该怎么实现呢? 方法很简单,就是在搜索框的input外面套一个form标签  注意点:form标签一定得添加 action属性(可设置为空) <form action=""><input type="text" name="search" /></form> 移动端软键盘的回车会触发form的submit事

RecycleView的使用+自定义监听事件

最近使用了RecycleView,发下这个控件十分好用,替代了listView和GridView,包括适配器都很方便. 效果如下: 具体使用如下所示: 1 compile 'com.android.support:recyclerview-v7:25.3.1' activity_main.xml <?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:android="http

cocos2d-js 自定义监听 EventCustom

cf.TestScene = cc.Scene.extend({ _listener1:null }); cf.TestScene.create = function () { var res = new cf.TestScene(); if(res && res.init()) { return res; } res = null; return null; } cf.TestScene.prototype.init = function () { if(cc.Scene.prototy

RabbitMQ消费端限流策略(十)

消费端限流: 什么是消费端限流? 场景: 我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据.(导致服务器崩溃,线上故障) 生产端一次推送几百条数据库,客户端只接收一两条,在高并发的情况下,不能再生产端做限流,只能在消费端处理. 解决方法: RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下, 如果一定数据的消息(通过基于consumer或者channel

cocos2d-js中的自定义监听方法(一个有趣的示例)

app.js中的实现: 1 var HelloWorldLayer = cc.Layer.extend({ 2 3 defualt:null, 4 5 ctor:function () { 6 7 this._super(); 8 mainscene = ccs.load(res.MainScene_json).node; 9 this.addChild(mainscene); 10 11 var sprite1 = ccui.helper.seekWidgetByName(mainscene,

rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制.因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率. 现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务. 与celery相比 在推送任务方面比celery的delay要快,推送的任务小. 使用更简单,没那么花哨给函数加装饰器来注册函数路由. 可以满足生产了. 比之前的 使用redis原生list结构作为消息队列取代celery框架. 更好,主要是rabbitmq有消费