架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)

接上文《架构设计:系统间通信(21)——ActiveMQ的安装与使用

3、ActiveMQ性能优化思路

上篇文章中的两节内容,主要介绍消息中间件ActiveMQ的安装和基本使用。从上篇文章给出的安装配置和示例代码来看,我们既没有修改ActivieMQ服务节点的任何配置,也没有采用任何的集群方案。这种情况只适合各位读者熟悉ActiveMQ的工作原理和基本操作,但是如果要将ActivieMQ应用在生产环境下,上文中介绍的运行方式远远没有挖掘出它的潜在性能。

根据这个系列文章所陈述的中心思想,系统的性能层次包括:代码级性能、规则性能、存储性能、网络性能,以及多节点协同方法(集群方案),所以我们优化ActiveMQ的中心思路也是这样的:首先优化ActiveMQ单个节点的性能,然后在配置ActiveMQ的集群。下面我们就按照这个思路,一步步介绍和ActiveMQ性能有关的那些事。

在默认情况下ActiveMQ的网络信息传递方式基于网络IO模型中的BIO方式。那么为了提高ActiveMQ单节点的工作性能,我们首先应该为每一个独立的MQ服务节点配置更高效的网络IO模型(关于网络IO模型的详细讲解,请参考本系列的另外几篇文章:《架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇》、

架构设计:系统间通信(4)——IO通信模型和JAVA实践 中篇》、《架构设计:系统间通信(5)——IO通信模型和JAVA实践 下篇》,这个思想也是整个系列文章的核心思想之一)。

另外我们还需要为ActiveMQ考虑一种存储方案,让它能高效的完成“持久化”消息的存储操作(也包括对“非持久化”消息的临时存储)**。虽然关于存储部分的知识和示例 在我另外一个系列的专题文章——系统存储(很快就会出品)才会讲到,但为了讲清楚消息中间件软件的性能优化,所以还是会讲到存储方案的选择。

另外,我们还知道了,使用集群方案能够增加整个软件的性能和稳定性,所以在完成单节点优化以后,我们还需要提供某种集群方案将多个ActiveMQ组合起来,让它们协同工作(一定是协同工作,单纯的安装多个ActiveMQ节点而不进行协同,是没法提高性能和稳定性的)。

Apache ActiveMQ是开源软件,您可以在ActiveMQ的官网下载到它的源代码,并进行代码级别性能的改造,但很明显这并不是一个能获得高效费比的方式。所以笔者不建议您和您的团队更改ActiveMQ代码实现(学习一下别人的代码实现还是可以的)。

4、ActiveMQ中的网络IO

4-1、基本连接配置

在上文中,我们提到了ActiveMQ支持多种消息协议,包括AMQP协议、MQTT协议、Openwire协议、Stomp协议等。在ActiveMQ的官方网站上,列出了目前ActiveMQ中支持的所有消息协议,它们是:AMQP、MQTT、OpenWire、REST、Stomp、XMPP;

不同的协议需要设置不同的网络监听端口,这个相关设置在ActiveMQ安装目录的./conf/conf/activemq.xml主配置文件中。主配置文件采用XML格式进行描述,其中的“transportConnectors”标记描述了各种协议的网络监听端口,示例如下:

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

以上配置了openwire协议的接入端口号为本机所有IP设备的61616(0.0.0.0代表本机所有IP设备);配置amqp协议的接入端口号为本机所有IP设备的5672;配置stomp协议的接入端口号为本机所有IP设备的61613,等等。这里注意以下几个事实:

  • 每一个“transportConnector”标记的name属性和uri属性都必须填写,name属性的值可以随便填写,它将作为一个Connector元素,显示在ActiveMQ管理界面的Connections栏目中;
  • 每一个“transportConnector”标记的uri元素,都有固定写法:uri头是指定的协议名称,例如amqp、mqtt、stomp等。然后是HOST/IP域名,指定端口监听所在的路由信息;请不要使用localhost或者127.0.0.1这样的回环地址,否则无法通过网络连接到ActiveMQ;接下来是端口信息,指定的端口不能重复,否则会产生冲突。
  • URI参数部分,每一种协议都有一些特定的参数,读者可参考ActiveMQ官网中,关于“协议”部分的介绍:http://activemq.apache.org/protocols.html。但是有些参数却是各种协议都可共用的,例如以上实例中使用的“maximumConnections”属性,代表这个端口支持的最大连接数量;”wireFormat.maxFrameSize”属相代表支持协议的“一个完整消息”的最大数据量(单位为byte);您可以在ActiveMQ官网中对wire formats的参数描述中,找到这些默认属性:http://activemq.apache.org/configuring-wire-formats.html

Any transport which involves marshalling messages onto some kind of network transport like TCP or UDP will typically use the OpenWire format. This is configurable to customize how things appear on the wire.

4-2、特别说明

  • 在上文给出的配置信息中,您可以发现我们在描述各种消息协议时,URI描述信息的头部都的是采用协议名称:例如,描述amqp协议的监听端口时,采用的URI描述格式为“amqp://……”;描述Stomp协议的监听端口时,采用的URI描述格式为“stomp://……”。唯独在进行openwire协议描述时,URI头却采用的“tcp://…..”。这是因为ActiveMQ中默认的消息协议就是openwire

OpenWire is binary protocol designed for working with Message Oriented Middleware. It is the native wire format of ActiveMQ.

OpenWire is our cross language Wire Protocol to allow native access to ActiveMQ from a number of different languages and platforms. The Java OpenWire transport is the default transport in ActiveMQ 4.x or later.

  • 上文中我们还讲到,ActiveMQ完整支持AMQP协议。但是读者会发现ActiveMQ中并没有存在Exchange这样的结构。这是怎么回事呢?实际上在国际标准组织 (ISO) 和国际电工委员会 (IEC) 制定的AMQP Version 1.0 规范文档中(《OASIS Advanced Message Queueing Protocol

    (AMQP) Version 1.0》),并没有说AMQP 消息必须经过Exchange规则才能够到达队列,也没有规定Exchange 必须要实现某种规则的路由。所以在支持AMQP协议时,是否需要有Exchange这样的路由处理规则,完全取决于AMQP的消息中间件软件厂商自己的决定。下面一段代码是使用JMS API连接ActiveMQ的AMQP端口,发送AMQP消息的示例:

package mq.test.amqp;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;

public class JMSProducer {
    public static void main(String[] args) throws Throwable {
        // 注意,JMS-AMQP使用的是Apache QPID的实现。如果您需要运行这段代码,请导入QPID的客户端
        /*
         * <dependency>
         *  <groupId>org.apache.qpid</groupId>
         *  <artifactId>qpid-amqp-1-0-client-jms</artifactId>
         *  <version>0.32</version>
         * </dependency>
         * */
        ConnectionFactoryImpl factory = ConnectionFactoryImpl.createFromURL("amqp://192.168.61.138:5672");
        Connection connection = factory.createQueueConnection();
        connection.start();

        // 建立会话,连接到叫做/test的Queue上
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = session.createQueue("/test");
        MessageProducer messageProducer = session.createProducer(queue);

        // 开始发送消息
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("23456656457567456");
        messageProducer.send(outMessage);

        //关闭
        messageProducer.close();
        connection.close();
    }
}

4-3、网络配置优化

那么各位读者您是否觉得上一小节那样的连接端口配置太过冗长,不好进行管理?确实是这样,并且实际工作中我们也只会使用几种固定的协议。所以ActiveMQ在Version 5.13.0+ 版本后,将OpenWire, STOMP, AMQP, MQTT这四种主要协议的端口监听进行了合并,并使用auto关键字进行表示。也就是说,ActiveMQ将监听这一个端口的消息状态,并自动匹配合适的协议格式。配置如下:

<transportConnectors>
    <transportConnector name="auto" uri="auto://0.0.0.0:61617?maximumConnections=1000" />
</transportConnectors>

以上的URI配置信息中,可以使用所有通用的Connection Configuration、Wire Formats Configuring、Server side options和TCP Transport Configuration配置项。但是这种优化只是让ActiveMQ的连接管理变得简洁了,并没有提升单个节点的处理性能。

如果您不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络IO模型。所以为了首先提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型,如下所示:

<transportConnectors>
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000"/>
</transportConnectors> 

请注意,URI格式头以”nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持Openwire协议。那么我们怎么既让这个端口支持NIO网络IO模型,又让它支持多个协议呢?ActiveMQ的服务端设置,允许开发人员使用“+”符号来为端口设置多种特性,如下:

<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.transformer=jms"/>
// 表示这个端口使用NIO模型支持Stomp协议

<transportConnector name="amqp+ssl" uri="amqp+ssl://localhost:5671"/>
// 表示这个端口支持amqp和ssl密文传输

所以如果我们既需要某一个端口支持NIO网络IO模型,又需要它支持多个协议,那么可以进行如下的配置:

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000" />

另外,如果是为了生产环境进行的配置,那么您至少应该还要配置这个端口支持的最大连接数量、设置每一条消息的最大传输值、设置NIO使用的线程池最大工作线程数量(当然您已经知道了这些设置的文档所在位置,所以您可以根据自己的情况进行设置属性的增减):

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />

以下附图是改变网络连接设置后,ActiveMQ管理控制台中Connections页面显示的内容。注意ws协议的端口是额外保留的配置——因为auto模式中的协议不支持ws:

5、JMS规范中的几个基本概念

由于ActiveMQ是JMS规范的完整实现,所以为了讲清楚ActiveMQ是如何进行存储和调度的,就需要首先说明JMS中和存储、调度有关的几个概念。它们是:消息收发模式(订阅-发布和负载均衡模式)、消息存储模式(持久化消息和非持久化消息)和订阅模型(持续订阅和非持续订阅)。

5-1、订阅发布模式和负载均衡模式

在上篇文章中我们已经详细过订阅-发布模式和负载均衡模式(http://blog.csdn.net/yinwenjie/article/details/50916518#t5)。在ActiveMQ的官方文档中的描述,他们的英文名称分别是Topics和Queue。这两种消息“发送-接受”模式,都是JMS规范中的标准模式。为了使各位读者在后续的阅读中,不会和JMS中的其他概念不造成混淆,我在后文章中都将使用规范的名称进行称呼。

  • 订阅-发布模式:

在“订阅-发布”模式下,消息会被复制多份,分别发送给所有“订阅”者。实际上在后文的描述中您将看到,这个复制的过程实际上没有您想象的简单。

  • 负载均衡模式:

在“负载均衡”模式下,一条消息将会发送给一个消息消费者,如果当前Queue没有消息消费者,消息将进行存储。同样通过后文的介绍,您会发现这其中的过程也并不简单。

5-2、持久化消息和非持久化消息

JMS中对非持久化消息和非持久化消息的称呼分别是:NON_PERSISTENT Message和PERSISTENT Meaage。它们指的是消息在任何一种“发送-接受”模式下(“订阅-发布”模式和“负载均衡模式”),是否进行持久化存储。

NON_PERSISTENT Message只存储在JMS服务节点的内存区域,不会存储在某种持久化介质上(后面我们要介绍到AcitveMQ可支持的持久化介质有:KahaBD、AMQ和关系型数据)。在极限情况下,JMS服务节点的内存区域不够使用了,也只会采用某种辅助方案进行转存(例如ActiveMQ会使用磁盘上的一个“临时存储区域”进行暂存)。一旦JMS服务节点宕机了,这些NON_PERSISTENT Message就会丢失

JMS中对PERSISTENT Meaage的定义是:这些消息不受JMS服务端异常状态的影响,JMS服务端会使用某种持久化存储方案保存这些消息,直到JMS服务端认为这些PERSISTENT Meaage被消费端成功处理。例如ActiveMQ中可以选择的持久化存储方案就包括:KahaDB、AMQ和关系型数据库。

在JMS标准API中,使用setDeliveryMode标记消息发送者是发送的PERSISTENT Meaage还是NON_PERSISTENT Message。示例如下:

......
for(int index = 0 ; index < 10 ; index++) {
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText("这是发送的消息内容:" + index);
    if(index % 2 == 0) {
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    } else {
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
    }
    sender.send(outMessage);
}
......

那么当JMS服务节点重启后(注意不是producer重启),以上代码中发送的10条消息只有其中5条消息能够保存下来

5-3、持续订阅和非持续订阅

持续订阅和非持续订阅,是针对“订阅-发布”模式的细分处理策略,在JMS规范中的标准称呼是:Durable-Subscribers和Non-Durable Subscribers。

Durable-Subscribers是指在“订阅-发布”模式下,即使标记为Durable-Subscribers的订阅者下线了(可能是因为订阅者宕机,也可能是因为这个订阅者故意下线),“订阅-发布”模式的Topic队列也要保存这些消息(视消息不同的持久化策略影响,保存机制不一样),直到下次这个被标记为Durable-Subscribers的订阅者重新上线,并正确处理这条消息为止。换句话说,标记为Durable-Subscribers的订阅者是否能获得某条消息,和它是否曾经下线没有任何关系

Non-Durable Subscribers是指在“订阅-发布”模式下,“订阅-发布”模式的Topic队列不用为这些已经下线的订阅者保留消息。当后者将消息按照既定的广播规则发送给当前在线的订阅者后,消息就可以被标记为“处理完成”。

==================接下文

时间: 2024-11-15 14:08:23

架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)的相关文章

架构设计:系统间通信(23)——提高ActiveMQ工作性能(中)

(接上文<架构设计:系统间通信(22)--提高ActiveMQ工作性能(上)>) 6.ActiveMQ处理规则和优化 在ActiveMQ单个服务节点的优化中,除了对ActiveMQ单个服务节点的网络IO模型进行优化外,生产者发送消息的策略和消费者处理消息的策略也关乎整个消息队列系统是否能够高效工作.请看下图所示的消息生产者和消息消费者的简要工作原理图: Producer既是消息生产者,作为一个发送消息的客户端它既可以使用同步消息发送模式,也可以使用异步的消息发送模式.另外,消息生产者在Acti

架构设计:系统间通信(24)——提高ActiveMQ工作性能(下)

(接上文<架构设计:系统间通信(23)--提高ActiveMQ工作性能(中)>) 7.ActiveMQ的持久消息存储方案 前文已经讲过,当ActiveMQ接收到PERSISTENT Message消息后就需要借助持久化方案来完成PERSISTENT Message的存储.这个介质可以是磁盘文件系统.可以是ActiveMQ的内置数据库,还可以是某种外部提供的关系型数据库.本节笔者将向读者讲解三种ActiveMQ推荐的存储方案的配置使用. 如上图2.1的步骤所示,所有PERSISTENT Mess

架构设计:系统间通信(26)——ActiveMQ集群方案(下)

(接上文<架构设计:系统间通信(26)--ActiveMQ集群方案(上)>) 3.ActiveMQ热备方案 ActiveMQ热备方案,主要保证ActiveMQ的高可用性.这种方案并不像上节中我们主要讨论的ActiveMQ高性能方案那样,同时有多个节点都处于工作状态,也就是说这种方案并不提高ActiveMQ集群的性能:而是从集群中的多个节点选择一个,让其处于工作状态,集群中其它节点则处于待命状态.当主要的工作节点由于各种异常情况停止服务时,保证处于待命的节点能够无缝接替其工作. 3-1.Acti

架构设计:系统间通信(27)——其他消息中间件及场景应用(上)

1.概述 目前业界有很多消息中间件可供大家选择,主要分为两类:需要付费的商业软件和开源共享的非商业软件.对于商业软件您和您的团队可以选择IBM WebSphere集成的MQ功能,也可以选择Oracle WebLogic集成的MQ功能.本文首先介绍除Apache ActiveMQ以外的两款开源共享的消息中间件产品,然后列举三个实际的业务常见,为读者介绍如何在这些实际业务中使用消息中间件解决问题. 2.RabbitMQ及特性 RabbitMQ基于Erlang语言开发和运行.它与Apache Acti

架构设计:系统间通信(29)——Kafka及场景应用(中2)

接上文:<架构设计:系统间通信(28)--Kafka及场景应用(中1)> 4-3.复制功能 我们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic可以有多个分区它们分布在整个Kafka集群的多个Broker服务节点中,并且一条消息只会按照消息生产者的要求进入topic的某一个分区.那么问题来了:如果某个分区中的消息在被消费端Pull之前,承载该分区的Broker服务节点就因为各种异常原因崩溃了,那么在这个Broker重新启动前,消费者就无法收到消息了. 为了解决这个问题,Apa

架构设计:系统间通信(36)——Apache Camel快速入门(上)

1.本专题主旨 1-1.关于技术组件 在这个专题中,我们介绍了相当数量技术组件:Flume.Kafka.ActiveMQ.Rabbitmq.Zookeeper.Thrift .Netty.DUBBO等等,还包括本文要进行介绍的Apache Camel.有的技术组件讲得比较深入,有的技术组件则是点到为止.于是一些读者朋友发来信息向我提到,这个专题的文章感觉就像一个技术名词的大杂烩,并不清楚作者的想要通过这个专题表达什么思想. 提出这个质疑的朋友不在少数,所以我觉得有必要进行一个统一的说明.这个专题

架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇

1.全文提要 系统间通信本来是一个很大的概念,我们首先重通信模型开始讲解.在理解了四种通信模型的工作特点和区别后,对于我们后文介绍搭建在其上的各种通信框架,集成思想都是有益的. 目前常用的IO通信模型包括四种(这里说的都是网络IO):阻塞式同步IO.非阻塞式同步IO.多路复用IO.和真正的异步IO.这些IO模式都是要靠操作系统进行支持,应用程序只是提供相应的实现,对操作系统进行调用. 上篇中,首先介绍传统的阻塞式同步IO和非阻塞式同步IO两种IO工作模式,然后使用JAVA进行实现:下篇,对多路复

架构设计:系统间通信(28)——Kafka及场景应用(中1)

(接上文<架构设计:系统间通信(27)--其他消息中间件及场景应用(上)>) 在本月初的写作计划中,我本来只打算粗略介绍一下Kafka(同样是因为进度原因).但是,最近有很多朋友要求我详细讲讲Kafka的设计和使用,另外两年前我在研究Kafka准备将其应用到生产环境时,由于没有仔细理解Kafka的设计结构所导致的问题最后也还没有进行交代.所以我决定即使耽误一些时间,也要将Kafka的原理和使用场景给读者详细讨论讨论.这样,也算是对两年来自己学习和使用Kafka的一个总结. 4.Kafka及特性

架构设计:系统间通信(20)——MQ:消息协议(下)

(接上文<架构设计:系统间通信(19)--MQ:消息协议(上)>) 上篇文章中我们重点讨论了"协议"的重要性,并为各位读者介绍了Stomp协议和XMPP协议.这两种协议是消息队列中两种不同使用场景下的典型代表.本文主要接续上文的篇幅,继续讨论消息队列中另一种典型协议:AMQP协议. 3-3.AMQP协议 AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议).目前AMQP协议的版本为 Version 1.0,这个协议标准