RabbitMQ简单Java示例——生产者和消费者

添加Maven依赖:

使用rabbitmq-client的最新Maven坐标:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.3.0</version>
</dependency>

添加账户

默认情况下,访问RabbitMQ服务的用户名和密码都是“guest”,这个账号有限制,默认只能通过本地网络(如localhost)访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。

添加新用户,用户名为“zifeiy”,密码为“passwd”:

C:\Users\zifeiy>rabbitmqctl add_user zifeiy passwd
Adding user "zifeiy" ...

为zifeiy用户设置所有权限:

C:\Users\zifeiy>rabbitmqctl set_permissions -p / zifeiy ".*" ".*" ".*"
Setting permissions for user "zifeiy" in vhost "/" ...

设置用户zifeiy为管理员角色:

C:\Users\zifeiy>rabbitmqctl set_user_tags zifeiy administrator
Setting tags for user "zifeiy" to [administrator] ...

计算机的世界是从“Hello World!”开始的,这里我们也沿用惯例,首先生产者发送一条消息”Hello World!“至RabbitMQ中,之后由消费者消费。

下面先演示生产者客户端的代码,然后再演示消费者客户端的代码。

生产者客户端代码

package com.zifeiy.springtest.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routingkey_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final int PORT = 5672;   // RabbitMQ服务端默认端口号为5672

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("zifeiy");
        factory.setPassword("passwd");
        Connection connection = factory.newConnection();    // 建立连接
        Channel channel = connection.createChannel();       // 创建信道
        // 创建一个type="direct"、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        // 创建一个持久化、非排他的、非自动删除的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 将交换器和队列通过路由绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        // 发送一条持久化的消息:hello world!
        String message = "hello,world!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes());
        // 关闭资源
        channel.close();
        connection.close();
    }
}

运行。

消费者客户端代码

package com.zifeiy.springtest.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Connection;

public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Address[] addresses = new Address[] {
                new Address(IP_ADDRESS, PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("zifeiy");
        factory.setPassword("passwd");
        // 这里的连接方式与生产者的demo略有不同,注意区分
        Connection connection = factory.newConnection(addresses);   // 创建连接
        final Channel channel = connection.createChannel(); // 创建信道
        channel.basicQos(64);   // 设置客户端最多接受未被ack的消息的个数
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        // 等待回调函数执行完毕后,关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

运行,命令行输出如下:

recv message: hello,world!

原文地址:https://www.cnblogs.com/zifeiy/p/9490660.html

时间: 2024-11-12 01:29:24

RabbitMQ简单Java示例——生产者和消费者的相关文章

Java多线程--生产者与消费者问题

说明 Java中,线程之间的通信主要是由java.lang.Object类提供的wait.notify和notifyAll这3个方法来完成: ①对象的wait方法被调用后,线程进入对象的等待队列中,并释放对象锁,其它线程可以竞争使用此对象锁:sleep方法使得一个线程进入睡眠状态,但是线程所占有的资源并没有释放. ②当对象的notify方法被调用,该方法会从对象的等待队列中随机取出一个线程来唤醒:notifyAll是唤醒等待队列中所有线程,这些线程会与其它正在执行的线程共同竞争对象锁. ③wai

浅谈Java简单实现的生产者与消费者问题

一.面对生产者和消费者的问题,首先我们得明白几点: 生产者:生产数据:消费者:消费数据.消费者在没有数据可供消费的情况下,不能消费:生产者在原数据没有被消费掉的情况下,不能生产新数据.假设,数据空间只有一个.实际上,如果实现了正确的生产和消费,则,两个线程应该是严格的交替执行. synchronized关键字若用在代码中,形成一个同步块,且,必须要执行锁:    synchronized (锁对象) {        同步块    }同步块使得锁对象称为thread monitor二.代码实现:

java线程 生产者与消费者

package org.rui.thread.block; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 生产者与消费者 * 餐馆 * * @author lenovo * */ public class Restaurant { //Restaurant r=new Restaurant

java之生产者与消费者

package com.produce; import java.util.LinkedList; import java.util.Queue; /*@author shijin * 生产者与消费者模型中,要保证以下几点: * 1 同一时间内只能有一个生产者生产 生产方法加锁sychronized * 2 同一时间内只能有一个消费者消费 消费方法加锁sychronized * 3 生产者生产的同时消费者不能消费 生产方法加锁sychronized * 4 消费者消费的同时生产者不能生产 消费方

Java实现生产者和消费者

生产者和消费者问题是操作系统的经典问题,在实际工作中也常会用到,主要的难点在于协调生产者和消费者,因为生产者的个数和消费者的个数不确定,而生产者的生成速度与消费者的消费速度也不一样,同时还要实现生产者与消费者的解耦,即生产者并不知道有哪些消费者,而消费者也不需要知道产品是哪个生产的,他们之间只与一个交易平台发生关系. 这是现实世界普遍存在的问题,比如我们去苹果专卖店买IPhone 6,我们属于消费者,而生产商把产品生产出来放在苹果专卖店,如果全世界只有一个苹果专卖店,当专卖店没有IPhone 6

java之生产者和消费者问题

package testThread; public class Test3 { public static void main(String[] args) { Clerk c = new Clerk(); //消费时不生产,生产时不消费 //生产者 new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub synchronized (c) { //无限生产 whil

Rabbitmq 消息对列 生产者与消费者的具体实现 springboot

RabbitMQ 基本介绍 RabbitMQ的设计理念是.只要有接收消息的队列. 邮件就会存放到队列里. 直到订阅人取走. . 如果没有可以接收这个消息的消息队列. 默认是抛弃这个消息的.. 我实现的功能是将远程实现两 Publisher: 是Message的生产者,Publisher这个Clients产生了一些Message. Consumer: Message的消费者,Publisher产生的Message,最终要到达Consumer这个Clients,进行消费. Exchange: 指定消

java多线程:生产者和消费者模式(wait-notify) : 单生产和单消费

单生产者 package com.example.t.pc; import java.util.List; //生产者 public class P { private List list; public P(){ } public P(List list){ this.list = list; } public void add(){ while(true){ synchronized (list){ try { System.out.println("3s----------------&q

[Dubbo开发]配置简单的生产者和消费者

配置好jdk1.7.Zookeeper和Maven环境之后,开始尝试第一次搭建简单的dubbo生产者和消费者. dubbo服务的原理,引用经典的官方图(cr.Dubbo官网): 关于Dubbo的原理和机制,在此不做赘述,具体可以查询官方文档:http://dubbo.apache.org/#!/?lang=zh-cn. 接下来开始搭建生产者和消费者. 1.生产者(Provider) 创建一个maven项目, 代码如下: (1)接口ProviderService.java package com.