柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)

一、回顾

让我们回顾一下,在上几章里都讲了什么?总结如下:

二、起航

本章节,柯南君将从几个层面,用官网例子讲解一下RabbitMQ的实操经典程序案例,让大家重新回到经典“Hello world!”(The simplest thing that does something )时代,RabbitMQ 支持N多种客户端(client),这里无法一一讲解,暂定java client,有时间的情况下,在弥补一下。

事先,先普及一下图标(我们会在下面的事例中,会大量用到,所以先普及一下,便于识别,最终更好理解事例的含义)

1、图标概念

① producting(生产者):在程序中 发送消息的一端,我们暂且称之为 生产者,在这里用“p”表示

② queue(队列):队列是一个邮箱的名字。它住在RabbitMQ。尽管消息流经RabbitMQ和您的应用程序,他们只可以存储在一个队列中。队列是不受任何限制,它可以储存尽可能多的信息(按你兴趣来了),它本质上是一个无限缓冲区。许多生产商可以发送消息到一个队列,许多消费者可以尝试接收数据从一个队列。

③ consuming(消费者):消费者和生产者是对应的,较为相似的意思;在这里,我用“C”表示

2、The Java client library

RabbitMQ 中AMQP这是一个开放的、通用的协议消息。有许多客户AMQP在许多不同的语言。我们将使用提供的Java客户机RabbitMQ。

我们需要下载(Download) client library package,并要核实每个jar包,解压到相应位置,如下图所示:

第一步:点击 http://www.rabbitmq.com/java-client.html,然后找到相应的lib下载位置

第二步:选择合适的下载,比如我下载了zip包,如图所示:

第三步:Unzip it(解压它) 到你的working directory(工作目录)中 and grab (并且获得)你的jar包文件

  • $ unzip rabbitmq-java-client-bin-*.zip
  • $ cp rabbitmq-java-client-bin-*/*.jar ./

3、程序案例

1) "Hello World"

在这部分教程中我们将用Java写两个程序,发送一个消息的生产者,消费者接收信息并打印出来。我们会掩盖一些细节的Java API,集中在这个非常简单的东西开始。这是一个“Hello World”的消息。在下面的图中,“P”是我们的生产和“C”是我们的消费者。中间的框是一个队列,消息缓冲区RabbitMQ保持代表消费者。

 ① sending (发送)

首先 让The sender(消息发送者)发送消息并且让我们的receiver (消息接收者)接收消息,The sender(消息发送者)将会connect to(连接)RabbitMQ,发送一个single message(单一的信息),然后exit(退出)。

  • 在send.java 中,我们需要import一些class ,如下所示:
	import com.rabbitmq.client.ConnectionFactory;
	import com.rabbitmq.client.Connection;
	import com.rabbitmq.client.Channel;
  • set up(设置)类和queue的name
	public class Send {

 		 private final static String QUEUE_NAME = "hello";

  		public static void main(String[] argv)
    	 	 throws java.io.IOException {
     		 ...
 		 }
       }
  • then 我们create 一个connection (连接)到server(服务端)
    onnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

备注

  • 这个connection 是抽象的socket connection链接;
  • 负责协议版本(protocol version negotigation)和身份认证(authentication );
  • 我们在本地机器上连接到一个代理即 localhost ,如果我们想要连接到代理不同机器上我们简单的指定其名称或者IP地址即可;

接下来,我们创建一个channel(通道),这个通道汇集了大多数的API服务!

为了发送,我们必须先声明一个为我们发送queue(队列),然后,往queue里发送一个message

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent ‘" + message + "‘");

消息内容是一个字节数组,所以你可以编码任何你喜欢的。最后,我们关闭通道和连接;

    channel.close();
    connection.close();

问题:

如果 sending doesn‘t work! 我们将怎么办?why?

如果这是你第一次使用RabbitMQ并且你看不到“发送的”消息,那么你可能抓耳挠腮没有足够的空闲磁盘空间(默认情况下它需要至少1
gb免费),因此拒绝接受消息。检查代理日志文件确认,如果有必要减少限制。配置文件的文档将向您展示如何设置disk_free_limit。

接下来的是send.java所有源代码:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
}

 ② Receiving (接收)

这就是我们的发送者。我们的接收器是将消息从RabbitMQ,所以不像发送方发布一个消息,我们将保持运行监听消息并打印出来

  • The code (in Recv.java)
    has almost the same imports as Send:
	import com.rabbitmq.client.ConnectionFactory;
	import com.rabbitmq.client.Connection;
	import com.rabbitmq.client.Channel;
	import com.rabbitmq.client.QueueingConsumer;

额外的QueueingConsumer是一个类,我们将使用缓冲消息推到我们的服务器。设置发送者一样,我们打开一个连接和一个通道,并宣布我们将使用的队列。注意这与队列,发送发布。

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

注意,我们在这里声明队列。因为我们可能会在发送方之前开始启动接收方,我们要确保队列存在之前我们尝试使用消息。我们要告诉服务器提供我们从队列的消息。因为它将异步消息,我们提供一个回调对象的形式,将缓冲的消息,直到我们准备使用它们。

QueueingConsumer要做什么呢?

  QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received ‘" + message + "‘");
    }

QueueingConsumer.nextDelivery()块,直到另一个来自服务器的消息交付。

下面是Recv.java 源代码:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
  }
}

待续....

时间: 2024-10-07 07:57:18

柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)的相关文章

柯南君:看大数据时代下的IT架构(5)消息队列之RabbitMQ--案例(Work Queues起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> <柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)> 二.Work Queues(using the Java Cl

柯南君:看大数据时代下的IT架构(6)消息队列之RabbitMQ--案例(Publish/Subscribe起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> <柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)> <柯南君:看大数据时代下的IT架构(5)消息队列之Rab

柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装、配置与监控

柯南君上一章<看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍>中,粗略的讲了一下,目前消息队列的几种常见产品的优劣对比,接下来的几章节会分别详细阐述,本章介绍RabbitMQ,好吧,废话少说,正式开始: 一.安装 1.安装Erlang 1)系统编译环境(这里采用linux/unix 环境) ① 安装环境 虚拟机:VMware? Workstation 10.0.1 build Linux系统:CentOS6.5 rabbitMQ官网下载:http://www.rab

柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍

柯南君上一章<柯南君:看大数据时代下的IT架构(1)业界消息队列对比 >中,粗略的讲了一下,目前消息队列的几种常见产品的优劣对比,接下来的几章节会分别详细阐述,本章介绍RabbitMQ,好吧,废话少说,正式开始: 一.基础概念详细介绍 1.引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题. 消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问

柯南君:看大数据时代下的IT架构(1)业界消息队列对比

一.MQ(Message Queue) 即消息队列,一般用于应用系统解耦.消息异步分发,能够提高系统吞吐量.MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ.RabbitMQ.ActiveMQ.Kafka/Jafka.Kestrel.Beanstalkd.HornetQ.Apache Qpid.Sparrow.Starling.Amazon SQS.MSMQ等,甚至Redis也可以用来构造消息队列.至于如何取舍,取决于你的需求. 由于工作需要和兴趣爱好,曾经写过关于RabbitMQ的系列博

看大数据时代下的IT架构(1)业界消息队列对比

一.MQ(Message Queue) 即消息队列,一般用于应用系统解耦.消息异步分发,能够提高系统吞吐量.MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ.RabbitMQ.ActiveMQ.Kafka/Jafka.Kestrel.Beanstalkd.HornetQ.Apache Qpid.Sparrow.Starling.Amazon SQS.MSMQ等,甚至Redis也可以用来构造消息队列.至于如何取舍,取决于你的需求. 由于工作需要和兴趣爱好,曾经写过关于RabbitMQ的系列博

看大数据时代下的IT架构(1)图片服务器之演进史

        柯南君的公司最近产品即将上线,由于产品业务对图片的需求与日俱增,花样百出,与此同时,在大数据时代,大流量的冲击下,对图片服务器的压力可想而知,那么今天,柯南君结合互联网的相关热文,加上自己的一点实践经验,与君探讨,与君共勉! 一.图片服务器的重要性 当前,不管哪一家网站(包括 电商行业.O2O行业.互联网行业等),不管哪一种渠道 (包括 web端,APP端甚至一些SNS应用),在大数据时代下,在内容为王的前提下,对图片的需求量越来越大,柯南君的公司是一家O2O公司,也不例外,图片

柯南君:看大数据时代下的IT架构(9)消息队列之RabbitMQ--案例(RPC起航)

二.Remote procedure call (RPC)(using the Java client) 三.Client interface(客户端接口) 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞,直到收到RPC运算的结果.代码如下: fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print "f

柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)

二.Routing(路由) (using the Java client) 在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制. 三.Bindings(绑定) 在前面的学习中已经创建了绑定(bindings),代码如下: channel