发送消息:
1 package org.study.workfair; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import org.junit.Test; 6 import org.study.utils.ConnectionUtils; 7 8 import java.io.IOException; 9 import java.util.concurrent.TimeoutException; 10 11 public class Sender { 12 public static final String QUEUE_NAME = "test_simple_queue"; 13 14 @Test 15 public void send() throws IOException, TimeoutException, InterruptedException { 16 // 获取连接 17 Connection conn = ConnectionUtils.getConnection(); 18 // 获取通道 19 Channel channel = conn.createChannel(); 20 //创建队列 21 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 22 //每个消费者发送确认消息前,只发送一条消息 23 channel.basicQos(1); 24 String msg = "hello rabbitmq!"; 25 26 for (int i = 0; i < 50; i++) { 27 String tempStr = i + " " + msg; 28 //发送消息 29 channel.basicPublish("", QUEUE_NAME, null, tempStr.getBytes()); 30 System.out.println("[send] msg " + i + ": " + msg); 31 Thread.sleep(100); 32 } 33 34 channel.close(); 35 conn.close(); 36 } 37 }
接受消息:
1 package org.study.workfair; 2 3 import com.rabbitmq.client.*; 4 import org.junit.Test; 5 import org.study.utils.ConnectionUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Recv { 11 public static final String QUEUE_NAME = "test_simple_queue"; 12 13 @Test 14 public void recv() throws IOException, TimeoutException, InterruptedException { 15 Connection conn = ConnectionUtils.getConnection(); 16 Channel channel = conn.createChannel(); 17 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 18 channel.basicQos(1); 19 20 //定义消费者 21 DefaultConsumer consumer = new DefaultConsumer(channel) { 22 //重写获取到达消息 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 // super.handleDelivery(consumerTag, envelope, properties, body); 26 String msg = new String(body, "utf-8"); 27 System.out.println("[1] recv: " + msg); 28 29 try { 30 Thread.sleep(1000); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 }finally { 34 System.out.println("[1] done!"); 35 channel.basicAck(envelope.getDeliveryTag(),false); 36 } 37 } 38 }; 39 40 while (true) { 41 //监听队列 42 channel.basicConsume(QUEUE_NAME, false, consumer); 43 Thread.sleep(100); 44 } 45 46 47 } 48 }
原文地址:https://www.cnblogs.com/gongxr/p/9639528.html
时间: 2024-11-09 05:59:00