rabbitmq消费端的nack和重回队列的总结

重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式

消费者

package com.flying.rabbitmq.api.ack;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Consumer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 手工签收 必须要关闭 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    }
}

自定义消费者:

package com.flying.rabbitmq.api.ack;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }

    }

}

生产者:

package com.flying.rabbitmq.api.ack;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class Producer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";

        for(int i =0; i<5; i ++){

            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }

    }
}

原文地址:https://www.cnblogs.com/lflying/p/11107401.html

时间: 2024-11-09 03:56:02

rabbitmq消费端的nack和重回队列的总结的相关文章

RabbitMQ消费端限流策略(十)

消费端限流: 什么是消费端限流? 场景: 我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据.(导致服务器崩溃,线上故障) 生产端一次推送几百条数据库,客户端只接收一两条,在高并发的情况下,不能再生产端做限流,只能在消费端处理. 解决方法: RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下, 如果一定数据的消息(通过基于consumer或者channel

RabbitMQ消费端自定义监听(九)

场景: 我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理. 实际环境: 我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用. 操作: //生产端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory

rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制.因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率. 现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务. 与celery相比 在推送任务方面比celery的delay要快,推送的任务小. 使用更简单,没那么花哨给函数加装饰器来注册函数路由. 可以满足生产了. 比之前的 使用redis原生list结构作为消息队列取代celery框架. 更好,主要是rabbitmq有消费

RabbitMQ消费端消息的获取方式(.Net Core)

1[短链接]:BasicGet(String queue, Boolean autoAck) 通过request的方式独自去获取消息,断开式,一次次获取,如果返回null,则说明队列中没有消息. 隐患:每次获取消息都会创建channel. 优点:最安全的获取方式且性能不算太差. 2[长链接]: 1).EventingBasicConsumer[订阅式] 使用这种方式消息会全部打入当前消费者中,不管是否启用确认机制. 隐患:①根据消息的长短多少将影响当前消费者的占用资源. ②如果当前消费者挂掉,那

RabbitMQ消费端自定义监听器DefaultConsumer

消费者 package com.flying.rabbitmq.api.consumer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 自定义消费者类型 */ public class Consumer { public static void main(String[] args) th

消费端限流策略

使用场景 首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现如下情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据! Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息. 具体方法 void BasicQos(unit prefetchSize, ushort prefetchCount,

Rabbitmq消费失败死信队列

Rabbitmq 重消费处理 一 处理流程图: 业务交换机:正常接收发送者,发送过来的消息,交换机类型topic AE交换机: 当业务交换机无法根据指定的routingkey去路由到队列的时候,会全部发送到AE交换机.发送到此队列的消息属于,业务垃圾消息,或者攻击消息类型,交换机类型fanout 死信交换机:用于处理消费者,消费失败回退的消息,根据死信交换机的routingkey发送到死信队列,交换机类型 topic EXAMPLE: 业务routingkey: hello/task_queue

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起

消费端如何保证消息队列MQ的有序消费

消息无序产生的原因 消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer),尽管消费端在拉取消息时是有序的,但各个消息由于网络等方面原因无法保证在各个消费端中处理时有序. 场景分析 先后两次修改了商品信息,消息A和消息B先后同步写入MySQL,接着异步写入消息队列中发送消息,此时消息队列生产端(Producer)按时序先后发出了A和B两条消息(消息A先发出,消息B后发出)