In order to exchange messages, producers and consumers (clients) need to connect to
the broker. This client-to-broker communication is performed through transport connectors.
ActiveMQ provides an impressive list of protocols clients can use to exchange
messages. The requirements of ActiveMQ users in terms of connectivity are diverse.
Some users focus on performance, others on security, and so on. ActiveMQ tries to
cover all these aspects and provide a connector for every use case.
In this section you’ll learn how transport connectors are configured in the
ActiveMQ configuration files and adapt the stock portfolio example to demonstrate
various connectors. In the following sections, we’ll go through protocols available for
connecting to the broker over the network, as well as introduce the concept of the
embedded broker and Virtual Machine Protocol used for communicating with brokers
inside your application (a topic that will be continued in chapter 7).
为了交换消息,消息生产者和消费者(客户端)需要连接到代理(broker).
这种客户端-代理之间的通信就是通过传输连接(transport connector)来完成.
ActiveMQ提供一系列的连接协议,客户端使用这些协议可以交换消息.
ActiveMQ用户对连接的需求是多种多样的,比如有些用户关注性能,有些关注安全等等,
ActiveMQ提供连接器以满足所有用户的需求.
本节中,你将学习如何在ActiveMQ配置文件中配置传输连接器,同时会修改 stock portfolio例子,
以便阐述各种各样的链接器.在接下来的章节中,我们将讨论连接协议,使用这些连接协议,可以通过网络连接到代理.
另外,本节中还将介绍嵌入式代理(embedded broker)和虚拟机协议,使用虚拟机协议,允许在应用程序内部与代理直接通信
(这个专题将在第七章继续讨论).
4.2.1 Configuring transport connectors
From the broker’s perspective, the transport connector is a mechanism used to accept
and listen to connections from clients. If you take a look at the ActiveMQ demo configuration
file (conf/activemq-demo.xml), you’ll see the configuration snippet for transport
connectors similar to the following example:
<transportConnectors>
<transportConnector name=”openwire” uri=”tcp://localhost:61616″ discoveryUri=”multicast://default”/>
<transportConnector name=”ssl” uri=”ssl://localhost:61617″/>
<transportConnector name=”stomp” uri=”stomp://localhost:61613″/>
<transportConnector name=”xmpp” uri=”xmpp://localhost:61222″/>
</transportConnectors>
从代理的(broker)的角度来说,传输连接(transport connector)是一种机制,用来处理和监听客户端的连接.
如果你查看ActiveMQ demo的配置文件(conf/activemq-demo.xml),你会看到,传输连接配置类似于下面的代码片段:
<transportConnectors>
<transportConnector name=”openwire” uri=”tcp://localhost:61616″ discoveryUri=”multicast://default”/>
<transportConnector name=”ssl” uri=”ssl://localhost:61617″/>
<transportConnector name=”stomp” uri=”stomp://localhost:61613″/>
<transportConnector name=”xmpp” uri=”xmpp://localhost:61222″/>
</transportConnectors>
As you can see, transport connectors are defined within the <transportConnectors>
element. You define particular connectors with the appropriate nested <transport-Connector> element.
ActiveMQ simultaneously supports many protocols listening on
different ports. The configuration for a connector must uniquely define the name
and the URI attributes. In this case, the URI defines the network protocol and optional
parameters through which ActiveMQ will be exposed for connectivity. The
discoveryUri attribute as shown on the OpenWire connector is optional and will be
discussed further in section 4.3.1.
正如你看到的,传输连接使用<transportConnectors>元素来定义.通过嵌套的<transport-Connector>元素
来定义特定的连接.ActiveMQ支持同时支持不同的连接协议监听不同的端口.连接器配置的name和URI属性必须唯一.
这样,通过URI定义网络协议以及可选的参数,ActiveMQ便产生了一个连接.在OpenWire connector配置中的
discoveryUri属相是可选的,这部分将在4.3.1节中讨论.
The preceding snippet defines four transport connectors. Upon starting up
ActiveMQ using such a configuration file, you’ll see the following log in the console as
these connectors start up:
INFO TransportServerThreadSupport – Listening for connections at:tcp://localhost:61616
INFO TransportConnector – Connector openwire Started
INFO TransportServerThreadSupport – Listening for connections at:ssl://localhost:61617
INFO TransportConnector – Connector ssl Started
INFO TransportServerThreadSupport – Listening for connections at:stomp://localhost:61613
INFO TransportConnector – Connector stomp Started
INFO TransportServerThreadSupport – Listening for connections at:xmpp://localhost:61222
INFO TransportConnector – Connector xmpp Started
前面的代码片断中,一共定义了4个传输连接.使用包含上面代码片段的配置文件启动ActiveMQ时,控制台
会显示下面的日志信息:
INFO TransportServerThreadSupport – Listening for connections at:tcp://localhost:61616
INFO TransportConnector – Connector openwire Started
INFO TransportServerThreadSupport – Listening for connections at:ssl://localhost:61617
INFO TransportConnector – Connector ssl Started
INFO TransportServerThreadSupport – Listening for connections at:stomp://localhost:61613
INFO TransportConnector – Connector stomp Started
INFO TransportServerThreadSupport – Listening for connections at:xmpp://localhost:61222
INFO TransportConnector – Connector xmpp Started
From the client’s perspective, the transport connector URI is used to create a connection
to the broker in order to send and receive messages. Sending and receiving messages
will be discussed in detail in chapter 7, but the following code snippet should be
enough to demonstrate the usage of the transport connector URIs in Java applications:
从客户端的角度来说,使用传输连接的URI可以创建一个到代理的连接,以便接收和发送消息.
接收和发送消息将在第7章详细讨论,下面的代码片段足以说明Java应用程序中传输连接URI
(transport connector URI)的用处.
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(“tcp://localhost:61616″);
Connection connection = factory.createConnection();
connection.start();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Note in the preceding example that the transport connector URIs defined in
ActiveMQ configuration are used by the client application to create a connection to
the broker. In this case, the URI for the TCP transport is used and is shown in bold
text.
注意,前面的例子中,客户端程序使用ActiveMQ配置文件中定义的的传输连接器的URI来创建到代理的连接.
代码中,展示了如何使用TCP transport的URI并以斜体显示.
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(“tcp://localhost:61616″);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
NOTE The important thing to know is that we can use the query part of the
URI to configure connection parameters both on the server and client sides.
Usually most of the parameters apply both for client and server sides of the
connection, but some of them are specific to one or the other, so be sure you
check the protocol reference before using the particular query parameter.
With this basic understanding of configuring transport connectors, it’s important to
become aware of and understand the available transport connectors in ActiveMQ. But
before we start explaining particular connectors, we must first adapt our stock portfolio
example so it can be used with different transport connectors.
注意:需要重点了解的是:我们可以使用URI中查询字符串部分的来配置服务端和客户端的连接参数.
通常这些参数同时适用于服务端和客户端,但是也有一些仅适用于其中之一,所以在使用查询字符串之前
需要查看连接协议的使用手册.有了这些配置传输连接的基本知识后,需要重点了解ActiveMQ提供了哪些可用传输连接了.
但在解释这些连接器之前,我们首先修改下stock portfolio的例子,这样可以先熟悉不同的传输连接.
4.2.2 Adapting the stock portfolio example
Chapter 3 introduced a stock portfolio example that uses ActiveMQ to publish and
consume stock exchange data. There, we used the fixed standard connector URI since
we wanted to make those introductory examples as simple as possible. In this chapter,
we’ll explain all protocols and demonstrate them by running the stock portfolio
example using each of them. For that reason, we need to modify the stock portfolio
example so it will work using any of the protocols.
Listing 4.1 is a modified version of the main() method from the stock portfolio
publisher.
第三章介绍了一个stock portfolio实例,在这个实例中使用ActiveMQ发送和接收处理消息,以便进行股票价格信息交换.
为了使stock portfolio这个入门性的实例看起来尽量简单,我们使用了固定的标准的URI配置连接器.
本章将解释所有连接协议,并在stock portfolio这个例子中使用各种连接协议.为此,我们需要修改
stock portfolio例子,使得这个例子可以在使用任何连接实例时能够正常运行.
清单4.1是stock portfolio例子中修改后的Publisher类的main()方法.
Listing 4.1 Modifying stock portfolio publisher to support various connector URIs
public static void main(String[] args) throws JMSException
{
if (args.length == 0)
{
System.err.println(“Please define a connection URI!”);
return;
}
Publisher publisher = new Publisher(args[0]);
String[] topics = new String[args.length – 1];
System.arraycopy(args, 1, topics, 0, args.length – 1);
while (total < 1000)
{
for (int i = 0; i < count; i++)
{
publisher.sendMessage(topics);
}
total += count;
System.out.println(“Published ‘” + count + “‘ of ‘” + total + “‘ price messages”);
try
{
Thread.sleep(1000);
}
catch (InterruptedException x)
{
}
}
publisher.close();
}
The preceding code ensures that the connector URI is passed as the first argument
and extracts topic names from the rest of the arguments passed to the application.
Now the stock portfolio publisher can be run with the following command:
前面的代码保证了程序运行时第一个命令行参数是连接器的URI,剩下的参数会被提取出来作为主题的名称.
至此可有下面的命令执行那个stock portfolio实例中的publisher.
$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch4.Publisher -Dexec.args=”tcp://localhost:61616 CSCO ORCL”
…
Sending: {price=65.713356601409, stock=JAVA, offer=65.779069958011,up=true}on destination: topic://STOCKS.JAVA
Sending: {price=66.071605671946, stock=JAVA, offer=66.137677277617,up=true}on destination: topic://STOCKS.JAVA
Sending: {price=65.929035001620, stock=JAVA, offer=65.994964036622,up=false}on destination: topic://STOCKS.JAVA
…
Note that one more argument has been added to the publisher: the URL to be used to
connect to the appropriate broker.
注意在执行publisher类时,新增了一个URI参数连接器使用这个参数连接到合适的代理.
The same principle can be used to modify the stock portfolio consumer. In the following
listing, you’ll find the stock portfolio consumer’s main() method modified to
accept the connection URI as a first parameter.
使用同样的方法,可以修改consumer类.在下面的代码清单中,你会看到stock portfolio实例的
consumer类的main()方法已经被修改过了以便使用第一个命令行参数作为连接器的URI.
Listing 4.2 Modifying stock portfolio consumer to support various connector URIs
public static void main(String[] args) throws JMSException
{
if (args.length == 0)
{
System.err.println(“Please define connection URI!”);
return;
}
Consumer consumer = new Consumer(args[0]);
String[] topics = new String[args.length – 1];
System.arraycopy(args, 1, topics, 0, args.length – 1);
for (String stock : topics)
{
Destination destination = consumer.getSession().createTopic(“STOCKS.” + stock);
MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
messageConsumer.setMessageListener(new Listener());
}
}
In order to achieve the same functionality as in the chapter 3 example, you should
run the consumer with an extra URI argument.
The following example shows how to do this:
下面的命令展示了执行consumer的命令行命令,改命令使用了一个额外的URI参数,以便使得这里的consumer
类与第三章中的consumer类功能相同.
$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch4.Consumer -Dexec.args=”tcp://localhost:61616 CSCO ORCL”
…
ORCL 65.71 65.78 up
ORCL 66.07 66.14 up
ORCL 65.93 65.99 down
CSCO 23.30 23.33 up
…
Note that the message flow between the producer and the consumer is the same as in
the original example. With these changes, the examples are now ready to be run using
a variety of supported protocols. Let’s now dig into the particular transport connectors.
In the following section we’ll see what options you have if you want to connect to
the broker over the network.
注意,修改后例子里面producer和consumer这个之间的消息流与未修改之前是相同的.做完这些修改后,
这个例子可以支持多种连接协议了.现在让我们深入探讨各种传输连接.在接下来的章节中,可以看到使用哪些
方法可以连接到代理.