public class Consumer { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{ ConnectionFactory connFactory = new ConnectionFactory(); connFactory.setHost("localhost"); Connection connection = connFactory.newConnection(); Channel channel = connection.createChannel(); //连接队列 // channel.queueDeclare("queue.lubby.test1", false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("queue.lubby.hello", true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("receive message is " + message); System.out.println("receive class is " + delivery.getClass()); System.out.println("receive envelope is " + delivery.getEnvelope()); System.out.println("receive properties is " + delivery.getProperties()); doWork(message); } // channel.close(); // connection.close(); } private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(10000); } } }
1). 服务器启动与关闭
启动: rabbitmq-server start
关闭:rabbitmqctl stop
2)生产者代码
public class Provider { public static void main(String[] args){ ConnectionFactory connFactory = new ConnectionFactory(); connFactory.setHost("localhost"); try { Connection connection = connFactory.newConnection(); Channel channel = connection.createChannel(); // queue为队列名称 如果队列不存在则创建 如果队列存在,则不允许修改队列属性 //durable 为是否可持久化 //创建队列 channel.queueDeclare("queue.lubby.hello2", true, false, false, null); // String message = getMessage(args); String message = "3."; //如果要持久化则 BasicProperties 必须配上 channel.basicPublish("", "queue.lubby.hello2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("sent : " + message); channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } }
3)消费者代码
时间: 2024-10-14 00:34:42