RabbitMQ消费端自定义监听器DefaultConsumer

消费者

package com.flying.rabbitmq.api.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 自定义消费者类型
 */
public class Consumer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        channel.basicConsume(queueName, true, new MyConsumer(channel));

    }
}

自定义消费者

package com.flying.rabbitmq.api.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * 实现自己的Consumer
 */
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel){
        super(channel);
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

生产者

package com.flying.rabbitmq.api.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_consumer_exchange";
        String routingKey = "consumer.save";

        String msg = "Hello RabbitMQ Consumer Message";

        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }

    }
}

原文地址:https://www.cnblogs.com/lflying/p/11107299.html

时间: 2024-08-24 23:34:38

RabbitMQ消费端自定义监听器DefaultConsumer的相关文章

RabbitMQ消费端自定义监听(九)

场景: 我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理. 实际环境: 我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用. 操作: //生产端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory

RabbitMQ消费端限流策略(十)

消费端限流: 什么是消费端限流? 场景: 我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据.(导致服务器崩溃,线上故障) 生产端一次推送几百条数据库,客户端只接收一两条,在高并发的情况下,不能再生产端做限流,只能在消费端处理. 解决方法: RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下, 如果一定数据的消息(通过基于consumer或者channel

rabbitmq消费端的nack和重回队列的总结

重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式 消费者 package com.flying.rabbitmq.api.ack; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Consumer

rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制.因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率. 现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务. 与celery相比 在推送任务方面比celery的delay要快,推送的任务小. 使用更简单,没那么花哨给函数加装饰器来注册函数路由. 可以满足生产了. 比之前的 使用redis原生list结构作为消息队列取代celery框架. 更好,主要是rabbitmq有消费

RabbitMQ消费端消息的获取方式(.Net Core)

1[短链接]:BasicGet(String queue, Boolean autoAck) 通过request的方式独自去获取消息,断开式,一次次获取,如果返回null,则说明队列中没有消息. 隐患:每次获取消息都会创建channel. 优点:最安全的获取方式且性能不算太差. 2[长链接]: 1).EventingBasicConsumer[订阅式] 使用这种方式消息会全部打入当前消费者中,不管是否启用确认机制. 隐患:①根据消息的长短多少将影响当前消费者的占用资源. ②如果当前消费者挂掉,那

消费端限流策略

使用场景 首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现如下情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据! Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息. 具体方法 void BasicQos(unit prefetchSize, ushort prefetchCount,

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起

安卓应用-自定义监听器2

现在说一下自定义监听器的第二种方法. 1.新建应用程序 2.定义接口Spy,同上一篇. package com.test.leetlelistener2; public interface Spy { public void Listening(); } 3.定义类Enemy package com.test.leetlelistener2; public class Enemy { public Spy spy; public int time = 10; Timer timer=new Ti

Dubbo高级篇_10_Dubbo消费端直连服务提供者(开发调试)

直连提供者 (+) (#) 在开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连, 点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表, A接口配置点对点,不影响B接口从注册中心获取列表. (1) 如果是线上需求需要点对点,可在<dubbo:reference>中配置url指向提供者,将绕过注册中心,多个地址用分号隔开,配置如下:(1.0.6及以上版本支持) <dubbo:reference id="xxxService"