在默认情况下,一旦RabbitMQ重启的话,我们定义的exchange和queue都会消失,在queue里面的消息也自然消失的无影无踪。这样肯定是不合理的,毕竟谁都无法保证RabbitMQ服务器永远不用重启,也永远不会出故障。那么怎么保证在RabbitMQ服务器重启后消息不会丢失呢?这里我们就要用到RabbitMQ的持久化。
要完成消息的持久化需要三个步骤:
- 把exchange的durable属性设成true
- 把queue的durable属性设成true
- 把消息的deliveryMode设成2
第一和第二步是把exchange和queue设置为持久化,第三部是持久化消息。三步缺一不可,如果不做第三步则消息会在重启后消失,但exchange和queue不会。下面我们来看看具体代码:
package com.jaeger.persistence; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.junit.Test; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties.Builder; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static final String MY_EXCHANGE_NAME = "MyExchange"; private static final String MY_ROUTING_KEY = "MyRoutingKey"; private static final String MY_QUEUE_NAME = "MyQueue"; private static final String DIRECT = "direct"; private static final String HOST = "172.19.64.28"; private static final String USER = "jaeger"; private static final String PASSWORD = "root"; private static final int PORT = 5672; @Test public void createExchangeAndQueue() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setUsername(USER); connectionFactory.setPassword(PASSWORD); connectionFactory.setPort(PORT); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //把exchange的durable属性设成true channel.exchangeDeclare(MY_EXCHANGE_NAME, DIRECT, true); //把queue的durable属性设成true channel.queueDeclare(MY_QUEUE_NAME, true, false, false, null); channel.queueBind(MY_QUEUE_NAME, MY_EXCHANGE_NAME, MY_ROUTING_KEY); channel.close(); connection.close(); } @Test public void produce() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setUsername(USER); connectionFactory.setPassword(PASSWORD); connectionFactory.setPort(PORT); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String message = "Hello 世界!"; //把消息的deliveryMode设成2 BasicProperties props = new BasicProperties.Builder().deliveryMode(2).build(); channel.basicPublish(MY_EXCHANGE_NAME, MY_ROUTING_KEY, props, message.getBytes("utf-8")); System.out.println("Sent ‘" + message + "‘"); channel.close(); connection.close(); } }
时间: 2024-10-31 04:01:53