Java使用RabbitMQ之公平分发

发送消息:

 1 package org.study.workfair;
 2
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import org.junit.Test;
 6 import org.study.utils.ConnectionUtils;
 7
 8 import java.io.IOException;
 9 import java.util.concurrent.TimeoutException;
10
11 public class Sender {
12     public static final String QUEUE_NAME = "test_simple_queue";
13
14     @Test
15     public void send() throws IOException, TimeoutException, InterruptedException {
16         // 获取连接
17         Connection conn = ConnectionUtils.getConnection();
18         // 获取通道
19         Channel channel = conn.createChannel();
20         //创建队列
21         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22         //每个消费者发送确认消息前,只发送一条消息
23         channel.basicQos(1);
24         String msg = "hello rabbitmq!";
25
26         for (int i = 0; i < 50; i++) {
27             String tempStr = i + " " + msg;
28             //发送消息
29             channel.basicPublish("", QUEUE_NAME, null, tempStr.getBytes());
30             System.out.println("[send] msg " + i + ": " + msg);
31             Thread.sleep(100);
32         }
33
34         channel.close();
35         conn.close();
36     }
37 }

接受消息:

 1 package org.study.workfair;
 2
 3 import com.rabbitmq.client.*;
 4 import org.junit.Test;
 5 import org.study.utils.ConnectionUtils;
 6
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9
10 public class Recv {
11     public static final String QUEUE_NAME = "test_simple_queue";
12
13     @Test
14     public void recv() throws IOException, TimeoutException, InterruptedException {
15         Connection conn = ConnectionUtils.getConnection();
16         Channel channel = conn.createChannel();
17         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
18         channel.basicQos(1);
19
20         //定义消费者
21         DefaultConsumer consumer = new DefaultConsumer(channel) {
22             //重写获取到达消息
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25 //                super.handleDelivery(consumerTag, envelope, properties, body);
26                 String msg = new String(body, "utf-8");
27                 System.out.println("[1] recv: " + msg);
28
29                 try {
30                     Thread.sleep(1000);
31                 } catch (InterruptedException e) {
32                     e.printStackTrace();
33                 }finally {
34                     System.out.println("[1] done!");
35                     channel.basicAck(envelope.getDeliveryTag(),false);
36                 }
37             }
38         };
39
40         while (true) {
41             //监听队列
42             channel.basicConsume(QUEUE_NAME, false, consumer);
43             Thread.sleep(100);
44         }
45
46
47     }
48 }

原文地址:https://www.cnblogs.com/gongxr/p/9639528.html

时间: 2024-11-09 05:59:00

Java使用RabbitMQ之公平分发的相关文章

Java使用RabbitMQ之订阅分发(Topic)

使用RabbitMQ进行消息发布和订阅,生产者将消息发送给转发器(exchange),转发器根据路由键匹配已绑定的消息队列并转发消息,主题模式支持路由键的通配. 生产者代码: 1 package org.study.exchange3.topic3; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import org.junit.Test; 6 import org.study

rabbitmq 公平分发和消息接收确认(转载)

原文地址:http://www.jianshu.com/p/f63820fe2638 当生产者投递消息到broker,rabbitmq把消息分发到消费者. 如果设置了autoAck=true 消费者会自动确认收到信息.这时broker会立即将消息删除,这种情况下如果消费者出现异常(连接中断)该消息就会丢失.为了保证消息能够被正确的消费,rabbitmq支持消息确认. String basicConsume(String queue, boolean autoAck, Consumer callb

RabbitMQ学习第二记:工作队列的两种分发方式,轮询分发(Round-robin)和 公平分发(Fair dispatch)

1.什么是RabbitMQ工作队列 我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据.因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里.这时,就得使用工作队列了.一个队列有多个消费者同时消费数据. 下图取自于官方网站(RabbitMQ)的工作队列的图例 P:消息的生产者 C1:消息的消费者1 C2:消息的消费者2 红色:队列 生产者将消息发送到队列,多个消费者同时从队列中获

RabbitMQ学习第一记:用java连接RabbitMQ

1.什么是RabbitMQ MQ(Message Queue):消息队列,是服务端设计的一个可以存储大量消息的队列,并提供客户端操作队列的方法:生产队列(向队列中添加数据).消费队列(从队列中取数据).RabbitMQ就是基于消息队列的一个典型应用.RabbitMQ除了普通的生产消费功能,还有一些高级功能:公平分发 ,轮询分发,路由模式,通配符模式,发布订阅,队列持久化. 2.java实现RabbitMQ的连接 2.1.RabbitMQ客户端jar包 <dependency><group

JAVA实现RabbitMQ,附安装过程

RabbitMQ的第一个JAVA实现 RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境 Erlang官网   http://www.erlang.org/ Linux 下Erlang下载选择sourcefile Wget命令下载 Ubuntu下用tar –xzvf *.tar.gz命令解压 依次执行以下命令: ./configure--prefix=/home/hadoop/mydisk/erlang (该过程可能失败,建议sudoapt-get install build

java链接rabbitmq需要的jar包链接地址

准备: 1.下载rabbitmq并搭建环境(和python那篇一样:http://www.cnblogs.com/g177w/p/8176797.html) 2.下载支持的jar包(http://repo1.maven.org/maven2/com/rabbitmq/amqp-client) 生产者方(Productor.java): 1 package RabbitMQTest; 2 3 4 import java.util.HashMap; 5 import java.util.Map; 6

Java 小记 — RabbitMQ 的实践与思考

前言 本篇随笔将汇总一些我对消息队列 RabbitMQ 的认识,顺便谈谈其在高并发和秒杀系统中的具体应用. 1. 预备示例 想了下,还是先抛出一个简单示例,随后再根据其具体应用场景进行扩展,我觉得这样表述条理更清晰些. RabbitConfig: @Configuration public class RabbitConfig { @Bean public Queue callQueue() { return new Queue(MQConstant.CALL); } } Client: @Co

java调用rabbitmq

1 : 需要的jar 下载地址:http://repo1.maven.org/maven2/com/rabbitmq/amqp-client/3.0.4/ maven配置: <dependency>    <groupId>com.rabbitmq</groupId>    <artifactId>amqp-client</artifactId>    <version>3.0.4</version></depend

Java连接RabbitMQ之创建连接

依赖包: 1 <dependencies> 2 <dependency> 3 <groupId>junit</groupId> 4 <artifactId>junit</artifactId> 5 <version>4.12</version> 6 <scope>test</scope> 7 </dependency> 8 9 <!-- https://mvnrepos