server端代码:
1 package com.example.workqueue; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.MessageProperties; 9 10 public class Send { 11 12 public static void main(String[] args) throws IOException { 13 14 // 队列名称 15 String queueName = "task_queue"; 16 17 ConnectionFactory factory = new ConnectionFactory(); 18 19 //远程服务器ip,如果在本地测试可以改成localhost 20 factory.setHost("121.40.151.120"); 21 22 //不是在本地测试,用户名和密码必填 23 factory.setUsername("rabbitmqname"); 24 factory.setPassword("rabbitmqpwd"); 25 26 Connection conn = factory.newConnection(); 27 Channel channel = conn.createChannel(); 28 29 boolean durable = true; 30 31 /** 32 * 参数说明: 33 * queue:队列名称 34 * durable:队列数据是否可以持久化,true:是,false:否。也就是服务重启后队列数据是否依然存在 35 * exclusive:是否为某一个队列的专用连接 36 * autoDelete:当队列不再被使用也就是没有消费者的时候是否自动删除 37 * arguments:其它参数,比如队列存活时间 38 */ 39 channel.queueDeclare(queueName, durable, false, false, null); 40 41 String[] strs = new String[] { "First message." }; 42 String message = getMessage(strs); 43 44 /** 45 * 参数说明: 46 * exchange:默认的exchange就是"",是direct类型的, 47 * 任何发往到默认exchange的消息都会被路由到routingKey的名字对应的队列上,如果没有对应的队列,则消息会被丢弃。 48 * routingKey:指定接收消息的队列 49 * props:其它属性,比如消息路由头信息,持久化信息 50 * body:消息内容 51 */ 52 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 53 54 System.out.println("[" + message + "]"); 55 56 // 最后,我们关闭channel和连接,释放资源。 57 channel.close(); 58 conn.close(); 59 } 60 61 private static String getMessage(String[] strings) { 62 if (strings.length < 1) { 63 return "Hello World!"; 64 } 65 return joinStrings(strings, " "); 66 } 67 68 private static String joinStrings(String[] strings, String delimiter) { 69 int length = strings.length; 70 if (length == 0) { 71 return ""; 72 } 73 StringBuilder words = new StringBuilder(strings[0]); 74 for (int i = 1; i < length; i++) { 75 words.append(delimiter).append(strings[i]); 76 } 77 return words.toString(); 78 } 79 80 }
client端代码:
1 package com.example.workqueue; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer; 10 import com.rabbitmq.client.ShutdownSignalException; 11 12 public class Recv { 13 14 public static void main(String[] args) throws IOException, ShutdownSignalException, 15 ConsumerCancelledException, InterruptedException { 16 17 // 队列名称 18 String queueName = "task_queue"; 19 20 ConnectionFactory factory = new ConnectionFactory(); 21 22 factory.setHost("121.40.151.120"); 23 factory.setUsername("rabbitmqname"); 24 factory.setPassword("rabbitmqpwd"); 25 26 Connection connection = factory.newConnection(); 27 Channel channel = connection.createChannel(); 28 29 // 表示在同一时间不要给一个Rev一个以上的消息(只能是一个),也就是说不要将一个新的消息分发给Rev直到它处理完了并且返回了前一个消息的通知标志(acknowledged) 30 channel.basicQos(1); 31 32 //与服务端一致 33 channel.queueDeclare(queueName, true, false, false, null); 34 35 System.out.println("CRTL+C"); 36 37 // QueueingConsumer:用来缓存服务端推送给我们的消息。 38 QueueingConsumer consumer = new QueueingConsumer(channel); 39 40 boolean autoAck = false; 41 /** 42 * 参数说明: 43 * queue:队列名称 44 * autoAck:是否自动应答,true:消息一旦被消费者消费,服务端就知道该消息已经投递,从而从队列中将消息剔除; 45 * false:需要在消费端显示调用channel.basicAck()方法通知服务端,如果没用显示调用,消息将进入 46 * unacknowledged状态,当前消费者连接断开后该消息变成ready状态重新进入队列。 47 * callback:具体消费者类 48 */ 49 channel.basicConsume(queueName, autoAck, consumer); 50 51 while (true) { 52 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 53 String message = new String(delivery.getBody()); 54 System.out.println("[" + message + "]"); 55 doWork(message); 56 System.out.println("r[done]"); 57 58 /** 59 * 显示调用通知服务端该消息已经消费并返回了acknowledged 60 * true:通知所有相同tag的untracked,false:只通知当前一个 61 */ 62 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 63 } 64 } 65 66 private static void doWork(String message) throws InterruptedException { 67 for (char ch : message.toCharArray()) { 68 if (ch == ‘.‘) { 69 Thread.sleep(1000); 70 } 71 } 72 } 73 74 }
时间: 2024-10-12 08:07:01