rabbitmq作为成熟的企业消息中间件,实现了应用程序间接口调用的解耦,提高系统的吞吐量。
下面介绍下rabbitmq的一些基本概念:
- message acknowledgment: 消息确认,解决消息确认问题,只有收到ack之后才能从消息系统中删除。
- message durability: 消 息持久化,当rabbitmq退出或崩溃后,会把queue中的消息持久化。但注意,RabbitMQ并不能百分之百保证消息一定不会丢失,因为为了提 升性能,RabbitMQ会把消息暂存在内存缓存中,直到达到阀值才会批量持久化到磁盘,也就是说如果在持久化到磁盘之前RabbitMQ崩溃了,那么就 会丢失一小部分数据,这对于大多数场景来说并不是不可接受的,如果确实需要保证任务绝对不丢失,那么应该使用事务机制
- exchange: 映射关系,实现消息名和队列之间的映射,根据消息名将消息发送到相应的队列中。
- 常见的映射模式:
- direct:转发消息到routigKey指定的队列
- topic:按规则转发消息(最灵活)
- headers:
- fanout:转发消息到所有绑定队列
- routing:exchange和queue之间绑定的媒介,成为routing key
在elong,我们开发了一套基于rabbitmq的消息系统,可以实现消息的可靠传输,提供了简单的restful api, 减少业务使用rabbitmq的学习成本。
下面说下这套系统jmsg的主要组成部分,在说之前,需要首先连接数据库结构:
1.MessageConfig 发送端配置,消息->Queue映射关系
CREATE TABLE `MessageConfig` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`MessageName` varchar(200) NOT NULL, --消息名称
`ExchangeName` varchar(200) NOT NULL, --消息名和队列的映射关系
`Priority` varchar(50) DEFAULT NULL, -- exchange与queue之前绑定的媒介
`UseDelayRetry` bit(1) DEFAULT NULL, — 是否使用重试
`DelayTime` int(11) DEFAULT NULL, —延迟多长时间重试
`MaxRetryCount` int(11) DEFAULT ‘3’, —最大重试次数
`_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`),
UNIQUE KEY `IX_MessageName` (`MessageName`)
) ENGINE=InnoDB AUTO_INCREMENT=409 DEFAULT CHARSET=utf8;
字段解释:
MessageName: 消息名称
ExchangeName: exchange名称
Priority: 优先级,一个业务线可以根据不同优先级有多个队列
UseDelayRetry:是否使用重试
DelayTime: 延迟多长时间重试
MaxRetryCount: 最大重试次数
表数据
2.MessageConsumersConfig 表: 消费端配置,消息->接收方配置
CREATE TABLE `MessageConsumersConfig` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`MessageName` varchar(200) NOT NULL, — 消息名
`Url` varchar(400) NOT NULL, — 消息消费的url
`TimeOut` int(11) DEFAULT ‘10’,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=266 DEFAULT CHARSET=utf8;
MessageName: 消息名
Url: 消息消费url
Timeout: 消费超时时间
CREATE TABLE `QueueSetting` (
`ID` bigint(20) NOT NULL AUTO_INCREMENT,
`QueueName` varchar(50) DEFAULT NULL,
`QOS` int(11) DEFAULT NULL,
`ParallelCount` int(11) DEFAULT NULL,
`LastUpdateTime` datetime DEFAULT NULL,
`LastUpdateUserName` varchar(50) DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=51 DEFAULT CHARSET=utf8;
rabbimq 配置
serverIP : 服务器ip
Port:服务端口号
UserName: 用户名
PassWord: 密码
MaxPoolSize: 最大连接池大小
RequestedHeartbeat: 请求心跳检查时间(s)
RequestedConnectionTimeout: 请求连接超时时间
FailedLogBaseDir: 失败日志存储目录
ConnectionTimeOut: 连接保持时间(ms)
SendTimeOut: 发送超时(s)
ReceiveTimeOut: 接收超时时间(s)
SendLogBaseDir(发送日志目录)
1 jmsg-client
消息发送客户端,提供发送消息的接口
流程图:
其中比较重要的是RabbitConnectPool(单例创建连接池),该类中比较重要的属性和方法
_max: //可以创建的最大连接数
_created: 已经使用的连接数
_used: 已经使用的连接数
_sendTimeOut: 发送连接请求超时时间
_receiveTimeOut: 接收连接成功的超时时间
_clientExpires: 连接到期时间
_connectionTimeOut: 连接超时
_qos:
重要的方法:
getSendingConnection(): 获取一个发送端的连接, 如果不是强制,就从连接池中获取连接,否则强制创建一个连接
getNextProxy:() 从连接池中过去连接(返回RabbitSendProxy), 如果超过最大连接数,则创建新连接, 否则加锁获取 proxy(pollProxy),如果返回为空,这等待,直到获取连接为止
pollProxy(): 获取连接, 从proxyqueue中poll,如果连接不可用,这_created–, 然后_used++, 如果创建条数 < 最大数, 这获取新连接(newProxy(), create++, _used++
returnToPool(): 返回到连接池,
getNewProxy(): 三次重试, getProxy(), 重试间隔0.1s
getProxy(): 通过工厂模式生成连接
RabbitProxy: 客户端连接rabbitmq 代理接口,做为连接池中的单元连接代理,可以由发送端和接收端继承
主要属性:
isAvailable: 是否可用,默认true
createTime: 创建时间,
DisposeListener: 连接池关闭需要执行的接口
connectionTimeout: 连接超时时间, 当前时间-createTime <= connectionTimeout可用
receiveTimeOut: 接收超时时间
Connection: 最主要的类,com.rabbitmq.client.Connection 连接
qos: 服务器一次可以传输的消息条数
Channel : 管道,连接创建管道,进行数据传输
ConnectionFactory: 连接工厂,创建rabbitmq连接
主要操作:
isAvailable(): 连接是否可用
dispose(): 关闭连接 需要关闭channel和connection
RabbitSendProxy 发送端代理, 默认开通channel confirsSelect,即确认机制
send(): 发送方法
流程:转换成byte数组->检查消息长度(小于64K) -> 缓存数据,等待确认->发送(basicPublish) -> 在接收到后删除缓存数据
下面说下如何保证数据一定能发送到rabbit queue中:
为了解决发送失败的问题,解决的思路无非是消息持久化,采用文件做持久化是比较好的选择。
具体的实现是消息失败后,放入blockingqueue作为数据换出的地方,定期从queue中读取数据存储文件,开启定时任务读取数据,重新send到queue中。
2 jmsg-server
作用: 从rabbitmq中读取消息,通过http接口调用消费者
数据库如图:
jmg-server的流程:
- 从数据库拿到该机器需要处理的queue,初始化rabbitmq连接池
- 遍历queuelist,注册监听器,对每个queue获取的消息处理
- 对每个queue开启MessageReceiver线程,监听该queue数据
- messageReceiver 开启线程池,qos是线程池大小
- messageReceiver是一个循环,不断获取rabbitmq server连接
- 获取到数据后,开启线程进行处理MessageProcessTask, 该任务主要是查找ImessageListener的实现类,调用receive方法
- 接收到消息处理:
消 息校验 -> 获取消息配置,找到消费者-> 判断没有正在处理 -> 消息还没有处理成功or 没有达到最大处理失败次数 – > 首次接收的消息入库- > 广播消息到接收方 -> 处理成功,记录messageLog,修改状态; 处理失败,发送到rabbitmq-server,等待下次处理.
3 rabbitmq-server
采用集群的方式搭建, 通过nginx对外提供统一的url
集群中一些重要的概念:
network partition: 网络中断,一般是子网之间的设备中断,这样在不同子网的设备通信会出现问题
搭建集群:
abbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群景象。Erlang的集群中各节点是经由过程一个magic cookie来实现的,这个cookie存放在 $home/.erlang.cookie 中(像我的root用户安装的就是放在我的root/.erlang.cookie中),文件是400的权限。所以必须包管各节点cookie对峙一致,不然节点之间就无法通信。
方案1: 普通集群
erlang 通过cookie来决定是否能和另外一个节点通信,通常的做法是在一个机器上生成cookie文件,拷贝到集群中的其他机器。
集群可以通过单逻辑broker的方式来连接多个机器。各机器间通过Erlang消息传递来通信,因此,集群内所有节点都必须有相同的Erlang cookie。集群内机器间的网络连接必须是可信的,且所有机器必须运行相同版本的Erlang和RabbitMQ。
虚拟机、交换机、用户和权限会自动镜像到集群内所有节点。队列可能位于单节点上,或者镜像到多个节点上。客户端连接到集群内任何节点都能看到集群内所有队列。
步骤
1
rabbit1$ rabbitmq-server -detached rabbit2$ rabbitmq-server -detached rabbit3$ rabbitmq-server -detached
2 加入以rabbit3为集群,集群名为[email protected],则需要在rabbit1和rabbit2上执行下面操作,加入[email protected],
rabbit2$ rabbitmqctl stop_app Stopping node [email protected] ...done. rabbit2$ rabbitmqctl join_cluster [email protected] Clustering node [email protected] with [[email protected]] ...done. rabbit2$ rabbitmqctl start_app Starting node [email protected] ...done.
3 同样在rabbit3,上操作,加入[email protected]
rabbit3$ rabbitmqctl stop_app Stopping node [email protected] ...done. rabbit3$ rabbitmqctl join_cluster [email protected] Clustering node [email protected] with [email protected] ...done. rabbit3$ rabbitmqctl start_app Starting node [email protected] ...done.
方案2:镜像队列
上述配置的RabbitMQ默认集群模式,但并不包管队列的高可用性,尽管互换机、绑定这些可以复制到 集群里的任何一个节点,然则队列内容不会复 制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内 容到集群里的每个节点,须要创建镜像队列。
Federation允许一个broker上的交换机接收发布到另一个broker(这个broker可能是单独的机器或者集群)上的交换机的消息。为了节点间能够通过AMQP(带上SSL选项)通信,组成federation的两个交换机之间必须授予适当的用户和权限。
组成federation的交换机之间通过单向点对点连接。缺省情况下,在federation连接上,消息仅仅被转发一次,但是这样可增加更多、更复杂的路由拓扑。
在federation连接上,有些消息可能不会被转发;如果一条消息到达federated交换机后不能被路由到某个队列,则它不会被转发。
你可以在Internet上通过federation连接各个broker来pub/sub消息。
方案3: shovel
相比federation,工作在更低一层,shovel简单从一个broker的一个queue中消费消息,并传递到下一个broker的exchange上
the shovel simply consumes messages from a queue on one broker, and forwards them to an exchange on another.
参考资料:
1 http://lynnkong.iteye.com/blog/1699684
2 http://blog.chinaunix.net/topic/surpershi/
3 http://www.rabbitmq.com/documentation.html