RabbitMQ系列之二:work queue

server端代码:

 1 package com.example.workqueue;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 import com.rabbitmq.client.MessageProperties;
 9
10 public class Send {
11
12     public static void main(String[] args) throws IOException {
13
14         // 队列名称
15         String queueName = "task_queue";
16
17         ConnectionFactory factory = new ConnectionFactory();
18
19         //远程服务器ip,如果在本地测试可以改成localhost
20         factory.setHost("121.40.151.120");
21
22         //不是在本地测试,用户名和密码必填
23         factory.setUsername("rabbitmqname");
24         factory.setPassword("rabbitmqpwd");
25
26         Connection conn = factory.newConnection();
27         Channel channel = conn.createChannel();
28
29         boolean durable = true;
30
31         /**
32          * 参数说明:
33          * queue:队列名称
34          * durable:队列数据是否可以持久化,true:是,false:否。也就是服务重启后队列数据是否依然存在
35          * exclusive:是否为某一个队列的专用连接
36          * autoDelete:当队列不再被使用也就是没有消费者的时候是否自动删除
37          * arguments:其它参数,比如队列存活时间
38         */
39         channel.queueDeclare(queueName, durable, false, false, null);
40
41         String[] strs = new String[] { "First message." };
42         String message = getMessage(strs);
43
44         /**
45          * 参数说明:
46          * exchange:默认的exchange就是"",是direct类型的,
47          *             任何发往到默认exchange的消息都会被路由到routingKey的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。
48          * routingKey:指定接收消息的队列
49          * props:其它属性,比如消息路由头信息,持久化信息
50          * body:消息内容
51         */
52         channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
53
54         System.out.println("[" + message + "]");
55
56         // 最后,我们关闭channel和连接,释放资源。
57         channel.close();
58         conn.close();
59     }
60
61     private static String getMessage(String[] strings) {
62         if (strings.length < 1) {
63             return "Hello World!";
64         }
65         return joinStrings(strings, " ");
66     }
67
68     private static String joinStrings(String[] strings, String delimiter) {
69         int length = strings.length;
70         if (length == 0) {
71             return "";
72         }
73         StringBuilder words = new StringBuilder(strings[0]);
74         for (int i = 1; i < length; i++) {
75             words.append(delimiter).append(strings[i]);
76         }
77         return words.toString();
78     }
79
80 }

client端代码:

 1 package com.example.workqueue;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 import com.rabbitmq.client.ConsumerCancelledException;
 9 import com.rabbitmq.client.QueueingConsumer;
10 import com.rabbitmq.client.ShutdownSignalException;
11
12 public class Recv {
13
14     public static void main(String[] args) throws IOException, ShutdownSignalException,
15         ConsumerCancelledException, InterruptedException {
16
17         // 队列名称
18         String queueName = "task_queue";
19
20         ConnectionFactory factory = new ConnectionFactory();
21
22         factory.setHost("121.40.151.120");
23         factory.setUsername("rabbitmqname");
24         factory.setPassword("rabbitmqpwd");
25
26         Connection connection = factory.newConnection();
27         Channel channel = connection.createChannel();
28
29         // 表示在同一时间不要给一个Rev一个以上的消息(只能是一个),也就是说不要将一个新的消息分发给Rev直到它处理完了并且返回了前一个消息的通知标志(acknowledged)
30         channel.basicQos(1);
31
32         //与服务端一致
33         channel.queueDeclare(queueName, true, false, false, null);
34
35         System.out.println("CRTL+C");
36
37         // QueueingConsumer:用来缓存服务端推送给我们的消息。
38         QueueingConsumer consumer = new QueueingConsumer(channel);
39
40         boolean autoAck = false;
41         /**
42          * 参数说明:
43          * queue:队列名称
44          * autoAck:是否自动应答,true:消息一旦被消费者消费,服务端就知道该消息已经投递,从而从队列中将消息剔除;
45          *                      false:需要在消费端显示调用channel.basicAck()方法通知服务端,如果没用显示调用,消息将进入
46          *                            unacknowledged状态,当前消费者连接断开后该消息变成ready状态重新进入队列。
47          * callback:具体消费者类
48         */
49         channel.basicConsume(queueName, autoAck, consumer);
50
51         while (true) {
52             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
53             String message = new String(delivery.getBody());
54             System.out.println("[" + message + "]");
55             doWork(message);
56             System.out.println("r[done]");
57
58             /**
59              * 显示调用通知服务端该消息已经消费并返回了acknowledged
60              * true:通知所有相同tag的untracked,false:只通知当前一个
61              */
62             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
63         }
64     }
65
66     private static void doWork(String message) throws InterruptedException {
67         for (char ch : message.toCharArray()) {
68             if (ch == ‘.‘) {
69                 Thread.sleep(1000);
70             }
71         }
72     }
73
74 }
时间: 2024-10-12 08:07:01

RabbitMQ系列之二:work queue的相关文章

在Node.js中使用RabbitMQ系列二 任务队列

在上一篇文章在Node.js中使用RabbitMQ系列一 Hello world我有使用一个任务队列,不过当时的场景是将消息发送给一个消费者,本篇文章我将讨论有多个消费者的场景. 其实,任务队列最核心解决的问题是避免立即处理那些耗时的任务,也就是避免请求-响应的这种同步模式.取而代之的是我们通过调度算法,让这些耗时的任务之后再执行,也就是采用异步的模式.我们需要将一条消息封装成一个任务,并且将它添加到任务队列里面.后台会运行多个工作进程(worker process),通过调度算法,将队列里的任

RabbitMQ系列二(构建消息队列)

从AMQP协议可以看出,MessageQueue.Exchange和Binding构成了AMQP协议的核心.下面我们就围绕这三个主要组件,从应用使用的角度全面的介绍如何利用RabbitMQ构建消息队列以及使用过程中的注意事项. 声明MessageQueue: 在RabbitMQ中,无论是生产者发送消息还是消费者接收消息,都首先需要声明一个MessageQueue.这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先要明确: 1)消费者是无法订阅或者获取不存在的MessageQue

Powershell管理系列(二十六)PowerShell操作之批量导出&导入邮箱

-----提供AD\Exchange\Lync\Sharepoint\CRM\SC\O365等微软产品实施及外包,QQ:185426445.电话18666943750 项目中有时候做跨林邮箱迁移的时候,条件不成熟,比如安全考虑或者其他考虑,不能做双林信任,这样就提出了一个问题,历史邮件需要使用的话怎么办,一个简单高效的解决办法就是从源森林批量导出邮件为.pst文件,在批量导入到目的域森林,具体操作如下: 1.赋予管理账号邮件导入导出权限,命令如下: cls whoami New-Manageme

算法系列之二十三:离散傅立叶变换之音频播放与频谱显示

算法系列之二十三:离散傅立叶变换之音频播放与频谱显示 算法系列之二十三离散傅立叶变换之音频播放与频谱显示 导语 什么是频谱 1 频谱的原理 2 频谱的选择 3 频谱的计算 显示动态频谱 1 实现方法 2 杂项说明 结果展示 导语 频谱和均衡器,几乎是媒体播放程序的必备物件,没有这两个功能的媒体播放程序会被认为不够专业,现在主流的播放器都具备这两个功能,foobar 2000的十八段均衡器就曾经让很多人着迷.在上一篇对离散傅立叶变换介绍的基础上,本篇就进一步介绍一下频谱是怎么回事儿,下一篇继续介绍

hbase源码系列(二)HTable 如何访问客户端

hbase的源码终于搞一个段落了,在接下来的一个月,着重于把看过的源码提炼一下,对一些有意思的主题进行分享一下.继上一篇讲了负载均衡之后,这一篇我们从client开始讲吧,从client到master再到region server,按照这个顺序来开展,网友也可以对自己感兴趣的部分给我留言或者直接联系我的QQ. 现在我们讲一下HTable吧,为什么讲HTable,因为这是我们最常见的一个类,这是我们对hbase中数据的操作的入口. 1.Put操作 下面是一个很简单往hbase插入一条记录的例子.

[Axis2与Eclipse整合开发Web Service系列之二] Top-Down方式,通过WSDL逆向生成服务端(续)

前言 本篇是承接上一篇: [Axis2与Eclipse整合开发Web Service系列之二] Top-Down方式,通过WSDL逆向生成服务端 在上一篇粗略地介绍了如何使用Top-Down的方式创建一个web service .  但是对于如何部署及调用,以及一些细节的部分基本上没有介绍. 应某些博友的要求, 也适逢自己有空, 接下来就详细介绍一下整个部分如何进行. 环境准备 JDK 肯定要安装了, 这个就不多讲了. 1. eclipse  3.5.2 对eclipse 版本的要求其实不是很严

天津出差系列(二)----第二天

上午十一点才起床,然后和同事出去吃饭螺蛳粉,之后去做了足底按摩,下午三点打的回来后睡了一觉,起来晚上六点多了, 然后洗个澡把换洗的衣服洗了一下,和同事出去吃了份饺子.然后回来看看把电视打开 看了一圈后,没有什么好看的.把电脑打开开始修改沧州的需求. 看到了沧州代码写的有点混乱,打算把天津这边处理完毕后再处理吧,免得更新后造成系统不稳定.也可能是周日真心不想修改需求吧. -----------------------周日就这样过了,总是感觉过的很游离. 天津出差系列(二)----第二天,布布扣,b

Tokyo Tyrant(TTServer)系列(二)-启动参数和配置

启动参数介绍 ttserver命令可以启动一个数据库实例.因为数据库已经实现了Tokyo Cabinet的抽象API,所以可以在启动的时候指定数据库的配置类型. 支持的数据库类型有: 内存hash数据库 内存tree数据库 hash数据库 B+ tree数据库, 命令通过下面的格式来使用,'dbname'制定数据库名,如果省略,则被视作内存hash数据库. ttserver [-host name] [-port num] [-thnum num] [-tout num] [-dmn] [-pi

AWS系列之二 使用EC2

在本文中我们有三个任务. 第一:使用Amazon management console创建一个EC2实例. 第二:使用本地的命令行工具远程登陆到该EC2实例. 第三:在该EC2实例上创建一个web服务,并通过公共域名来访问该web服务. 如果你还没有AWS的账号的话,可以使用qwiklabs提供的免费实验来进行该练习.地址是https://run.qwiklabs.com. 登陆到亚马逊的AWS服务的management console后选择EC2服务. 然后你就可以到EC2的控制面板了. 点击