RabbitMQ五种工作模式学习总结

一、简介
最近,在看一些消息中间件的内容,之前都没有好好学习一下消息中间件。本文将对RabbitMQ中五种常用的工作模式做一个简单的介绍和总结。RabbitMQ常用的工作模式有:简单队列模式、工作队列模式、发布订阅模式、路由模式、主题模式。本文参照RabbitMQ官网示例总结,详细可以到官网查看:https://www.rabbitmq.com/getstarted.html。

二、简单队列模式(Simple Queue) 
【a】模型图:只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。(单生产单消费)

上图中,“P”是我们的生产者,“C”是我们的消费者。

【b】获取MQ连接对象工具类

/**
* @Description: 获取RabbitMQ的连接工具类
* @Author: weixiaohuai
* @Date: 2019/6/22
* @Time: 21:29
*/
public class MQConnecitonUtils {
private static final String RABBITMQ_HOST = "127.0.0.1";
private static final Integer RABBITMQ_PORT = 5672;
private static final String RABBITMQ_VHOST = "/vhost";
private static final String RABBITMQ_USERNAME = "wsh";
private static final String RABBITMQ_PASSWORD = "wsh";

public static Connection getConnection() {
//定义MQ连接对象
Connection connection = null;
//创建MQ连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置MQ主机名称
connectionFactory.setHost(RABBITMQ_HOST);
// 设置MQ AMQP端口号
connectionFactory.setPort(RABBITMQ_PORT);
// 设置MQ 连接的virtual host
connectionFactory.setVirtualHost(RABBITMQ_VHOST);
// 设置MQ 用户名称
connectionFactory.setUsername(RABBITMQ_USERNAME);
// 设置MQ 用户密码
connectionFactory.setPassword(RABBITMQ_PASSWORD);
try {
connection = connectionFactory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
//返回连接对象
return connection;
}

}
【c】生产者

/**
* @Description: 消息生产者
* @Author: weixiaohuai
* @Date: 2019/6/22
* @Time: 21:37
*/
public class CustomProducer {

private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";
private static final String SIMPLE_QUEUE_MESSAGE = "Hello World!";

public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
//创建通道
channel = connection.createChannel();
//创建Queue队列
channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
//发送消息到队列MQ_SIMPLE_QUEUE
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish("", SIMPLE_QUEUE_NAME, null, SIMPLE_QUEUE_MESSAGE.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【d】消费者

/**
* @Description: 消息消费者(新API)
* @Author: weixiaohuai
* @Date: 2019/6/22
* @Time: 21:55
*/
public class NewCustomConsumer {
private static Logger logger = LoggerFactory.getLogger(NewCustomConsumer.class);
private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
Channel channel;
try {
//创建消息通道对象
channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("receive message: " + message);
}
};
//监听消息队列
channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【e】运行结果

三、工作队列模式(Work Queues)
【a】模型图:多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费。工作队列有轮训分发和公平分发两种模式。

下面先说说轮训分发(round-robin)方式:

【b】消息生产者:

/**
* @Description: 工作队列 - 消息生产者
* @Author: weixiaohuai
* @Date: 2019/6/23
* @Time: 10:25
* <p>
* 说明:
* 消费者1与消费者2处理的消息是均分的,而且消息是轮训分发的(轮训分发 round-robin)
*/
public class CustomProducer {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> ";

public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建Queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//发送10条消息到工作队列
for (int i = 1; i <= 10; i++) {
StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);
//发送消息
channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【c】消息消费者1:模拟延迟操作2秒

public class CustomConsumer01 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);

private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
Channel channel = null;
try {
//创建消息通道对象
channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer01】receive message: " + message);
try {
//模拟延迟
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【d】消息消费者2:模拟延迟操作1秒

public class CustomConsumer02 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);

private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
Channel channel = null;
try {
//创建消息通道对象
channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer02】receive message: " + message);
try {
//模拟延迟
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【e】运行结果

由上面图可见,消费者1和消费者2处理的消息是均分的(消费的消息条数一样),而且消息是轮训分发的,也就是说同一个消息只能被一个消费者消费。上面的消费者1和消费者2处理消息的效率不同,但是最后接收到的消息还是一样多,如果需要让工作效率高的消费者消费更多的消息,那么可以使用公平分发,下面介绍一下工作队列的公平分发模式(能者多劳)。

【a】生产者:

/**
* @Description: 工作队列 - 消息生产者 (公平分发方式Fair dispatch)
* @Author: weixiaohuai
* @Date: 2019/6/23
* @Time: 10:25
* <p>
* 说明:
* 1. 生产者、消费者指定:channel.basicQos(1);
* 2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);
* 3. 消费者必须关闭自动应答:autoAck = false;
* 4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;
*/
public class CustomProducer {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> ";

public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建Queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);

//每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者(同一时刻服务器只会发送一条消息给消费者),消费者端发送了ack后才会接收下一个消息。
channel.basicQos(1);

//发送10条消息到工作队列
for (int i = 1; i <= 10; i++) {
StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);
//发送消息
channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【b】消费者1:

public class CustomConsumer01 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);

private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer01】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)
boolean autoAck = false;
//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【c】消费者2:

public class CustomConsumer02 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);

private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer02】receive message: " + message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【d】运行结果:

由此可见,消费者2的效率相对较高,所以消费者2消费消息比消费者1多一些,这样就可以充分发挥消费者处理消息的能力。

【e】注意点:

1. 生产者、消费者指定:channel.basicQos(1);
2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);
3. 消费者必须关闭自动应答:autoAck = false;
4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;

四、发布-订阅模式(Publish/Subscribe)
【a】模型图:生产者将消息发送到交换器,然后交换器绑定到多个队列,监听该队列的所有消费者消费消息。

【b】生产者:

/**
* @Description: 发布-订阅模式
* @Author: weixiaohuai
* @Date: 2019/6/23
* @Time: 15:20
* <p>
* 说明:可实现一条消息被多个消费者消费
* <p>
* a. 一个生产者,多个消费者;
* b. 每一个消费者都有自己的消息队列;
* c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
* d. 每个队列都需要绑定到交换机上;
* e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
*/
public class CustomProducer {
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
//类型:分发
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_TYPE = "fanout";

public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建交换机对象publish_subscribe_exchange_fanout
channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_TYPE);
//发送消息到交换机exchange上
String msg = "hello world!!!";
channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "", null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【c】消费者1:

public class CustomConsumer01 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer.class);
private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name01";
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上
channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer01】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}

}
【d】消费者2:

public class CustomConsumer02 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer.class);
private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name02";
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上
channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer02】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}

}
【e】运行结果

由此可见,一条消息同时被两个消费者同时消费。

【f】交换机绑定信息

【g】注意点:

a. 一个生产者,多个消费者;
b. 每一个消费者都有自己的消息队列,分别绑定到不同的队列上;
c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
d. 每个队列都需要绑定到交换机上;
e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
f. 如果消息发送到没有队列绑定的交换器时,消息将会丢失,因为交换器没有存储消息的能力,只有队列才有存储消息的能力;

五、路由模式(Routing)
【a】模型图:生产者将消息发送到direct交换器,它会把消息路由到那些binding key与routing key完全匹配的Queue中,这样就能实现消费者有选择性地去消费消息。

【b】生产者:

/**
* @Description: routing路由模式
* @Author: weixiaohuai
* @Date: 2019/6/23
* @Time: 15:20
* <p>
* 说明:生产者发送消息的时候指定routing key,然后消费者绑定队列的时候也指定一些binding key,只有binding key与routing key一致的消费者才能接收到此消息
*/
public class CustomProducer {
private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
//交换机类型:direct
private static final String EXCHANGE_TYPE = "direct";
private static final String EXCHANGE_ROUTE_KEY = "info";

public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建交换机对象
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
//发送消息到交换机exchange上
String msg = "hello world!!!";
//指定routing key为info
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【c】消费者1:

public class CustomConsumer01 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);
private static final String QUEUE_NAME = "routing_direct_queue_name";
private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
//binding key
private static final String EXCHANGE_ROUTE_KEY = "error";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer01】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}

}
【d】消费者2:

public class CustomConsumer02 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);
private static final String QUEUE_NAME = "routing_direct_queue_name02";
private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
//binding key
private static final String EXCHANGE_ROUTE_KEY01 = "error";
private static final String EXCHANGE_ROUTE_KEY02 = "info";
private static final String EXCHANGE_ROUTE_KEY03 = "warning";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY01);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY02);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY03);

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer02】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}

}
【e】运行结果:

因为生产者发布消息的时候指定了routing key为info, 消费者绑定队列的时候指定的binding key 为error,显然消费者1接收不到此消息,因为消费者2绑定队列的时候指定了binding key为error、info、warning,所以消费者2能够成功接收该消息进行消费。

【f】交换机绑定信息

六、主题(Topic)模式
【a】模型图:类似于正则表达式匹配的一种模式。主要使用#、*进行匹配。

【b】生产者:

/**
* @Description: topic主题模式
* @Author: weixiaohuai
* @Date: 2019/6/23
* @Time: 15:20
* <p>
*
* 说明:
* #: 代表一个或者多个
* *: 代表一个
*
* 举例:
* 比如发送消息的时候指定了routing key为news.insert,
* 如果消费者指定binding key 为news.* 或者news.#都能接收到该消息;
*
*/
public class CustomProducer {
private static final String EXCHANGE_NAME = "exchange_topic";
//交换机类型:topic 类似正则匹配模式
private static final String EXCHANGE_TYPE = "topic";
//指定routing key
private static final String EXCHANGE_ROUTE_KEY = "news.insert";

public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建交换机对象
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
//发送消息到交换机exchange上
String msg = "hello world!!!";
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【c】消费者1:

public class CustomConsumer01 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);
private static final String QUEUE_NAME = "topic_queue_name1";
private static final String EXCHANGE_NAME = "exchange_topic";
//binding key
private static final String EXCHANGE_ROUTE_KEY = "news.insert";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer01】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}

}
【d】消费者2:

public class CustomConsumer02 {
private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);
private static final String QUEUE_NAME = "topic_queue_name2";
private static final String EXCHANGE_NAME = "exchange_topic";
//binding key
private static final String EXCHANGE_ROUTE_KEY = "news.#";

public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);

channel.basicQos(1);

//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, StandardCharsets.UTF_8);
logger.info("【CustomConsumer02】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace(http://www.my516.com);
}
}

}
【e】运行结果:

生产者发送消息绑定的routing key 为news.insert;消费者1监听的队列和交换器binding key 为news.insert;消费者2监听的队列和交换器bindingkey为news.#,很显然,两个消费者都将接收到该消息。

 

【f】交换机绑定信息
---------------------

原文地址:https://www.cnblogs.com/hyhy904/p/11089240.html

时间: 2024-12-16 15:11:00

RabbitMQ五种工作模式学习总结的相关文章

rabbitmq的五种工作模式

abbitmq的五种工作模式 原文地址:https://www.cnblogs.com/Jeely/p/10784172.html

消息队列rabbitmq的五种工作模式(go语言版本)

前言:如果你对rabbitmq基本概念都不懂,可以移步此篇博文查阅消息队列RabbitMQ 一.单发单收 二.工作队列Work Queue 三.发布/订阅 Publish/Subscribe 四.路由Routing 五.Topic类型的exchange 六.rabbitmq部分封装代码及装备工作 一.单发单收 在下图中,“ P”是我们的生产者,“ C”是我们的消费者.中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区. 单发单收模式下:一发一收 发送端只需要创建队列,然后向队列发送消

AES五种加密模式(CBC、ECB、CTR、OCF、CFB)

分组密码有五种工作体制:1.电码本模式(Electronic Codebook Book (ECB)):2.密码分组链接模式(Cipher Block Chaining (CBC)):3.计算器模式(Counter (CTR)):4.密码反馈模式(Cipher FeedBack (CFB)):5.输出反馈模式(Output FeedBack (OFB)). 以下逐一介绍一下: 1.电码本模式(Electronic Codebook Book (ECB)     这种模式是将整个明文分成若干段相同

转:Windows下的PHP开发环境搭建——PHP线程安全与非线程安全、Apache版本选择,及详解五种运行模式。

原文来自于:http://www.ituring.com.cn/article/128439 Windows下的PHP开发环境搭建——PHP线程安全与非线程安全.Apache版本选择,及详解五种运行模式. 今天为在Windows下建立PHP开发环境,在考虑下载何种PHP版本时,遭遇一些让我困惑的情况,为了解决这些困惑,不出意料地牵扯出更多让我困惑的问题. 为了将这些困惑一网打尽,我花了一下午加一晚上的时间查阅了大量资料,并做了一番实验后,终于把这些困惑全都搞得清清楚楚了. 说实话,之所以花了这么

AES的五种加密模式(CBC、ECB、CTR、OCF、CFB)

AES五种加密模式(CBC.ECB.CTR.OCF.CFB) 分组密码有五种工作体制:1.电码本模式(Electronic Codebook Book (ECB)):2.密码分组链接模式(Cipher Block Chaining (CBC)):3.计算器模式(Counter (CTR)):4.密码反馈模式(Cipher FeedBack (CFB)):5.输出反馈模式(Output FeedBack (OFB)). 以下逐一介绍一下: 1.电码本模式(Electronic Codebook B

Apache三种工作模式介绍与配置

Apache三种工作模式介绍与配置 一.Apache的三种工作模式介绍及相关查看方法 1.Apache三种工作模式简介 Apache目前一共有三种稳定的MPM(Multi-Processing Module,多进程处理模块)模式.它们分别是prefork,worker和event,它们同时也代表这Apache的演变和发展. Apache在configure配置编译参数的时候,可以使用 --with-mpm=prefork|worker|event 来指定编译为那一种MPM,当然也可以用编译为三种

LVS负载均衡(LVS简介、三种工作模式、十种调度算法)《转》

一.LVS简介 LVS(Linux Virtual Server)即Linux虚拟服务器,是由章文嵩博士主导的开源负载均衡项目,目前LVS已经被集成到Linux内核模块中.该项目在Linux内核中实现了基于IP的数据请求负载均衡调度方案,其体系结构如图1所示,终端互联网用户从外部访问公司的外部负载均衡服务器,终端用户的Web请求会发送给LVS调度器,调度器根据自己预设的算法决定将该请求发送给后端的某台Web服务器,比如,轮询算法可以将外部的请求平均分发给后端的所有服务器,终端用户访问LVS调度器

VMware虚拟机三种工作模式

工作模式 VMWare提供了三种工作模式,它们是bridged(桥接模式).NAT(网络地址转换模式)和host-only(主机模式).要想在网络管理和维护中合理应用它们,就应该先了解一下这三种工作模式. bridged(桥接模式) 在这种模式下,VMWare虚拟出来的操作系统就像是局域网中的一台独立的主机,它可以访问网内任何一台机器.在桥接模式下,你需要手工为虚拟系统配置IP地址.子网掩码,而且还要和宿主机器处于同一网段,这样虚拟系统才能和宿主机器进行通信.同时,由于这个虚拟系统是局域网中的一

一步一步搭建开发框架(五)单元工作模式

1,单元工作模式主要为了提高与数据库的交互次数,提高应用程序效率.我们知道实际的业务操作中,有时需要好几张表一快保存,一块删除之类的逻辑,比如注册用户之后,用户表要加一条数据,积分表等与用户表有外键关系的表可能也需要保存数据,这样造成多次保存,也就是多次与数据库交互. 2,前边我把SaveChange()方法都写到了BaseDal里面,今晚上就把这个SaveChange方法提取出来!我们继续封装一个DbSession类,同时将抽象工厂的代码转移到这个DbSession类中. 1 namespac