rabbitmq在艺龙业务系统中的实践

rabbitmq作为成熟的企业消息中间件,实现了应用程序间接口调用的解耦,提高系统的吞吐量。

下面介绍下rabbitmq的一些基本概念:

  • message acknowledgment: 消息确认,解决消息确认问题,只有收到ack之后才能从消息系统中删除。
  • message durability: 消 息持久化,当rabbitmq退出或崩溃后,会把queue中的消息持久化。但注意,RabbitMQ并不能百分之百保证消息一定不会丢失,因为为了提 升性能,RabbitMQ会把消息暂存在内存缓存中,直到达到阀值才会批量持久化到磁盘,也就是说如果在持久化到磁盘之前RabbitMQ崩溃了,那么就 会丢失一小部分数据,这对于大多数场景来说并不是不可接受的,如果确实需要保证任务绝对不丢失,那么应该使用事务机制
  • exchange: 映射关系,实现消息名和队列之间的映射,根据消息名将消息发送到相应的队列中。
  • 常见的映射模式:
  • direct:转发消息到routigKey指定的队列
  • topic:按规则转发消息(最灵活)
  • headers:
  • fanout:转发消息到所有绑定队列
  • routing:exchange和queue之间绑定的媒介,成为routing key

在elong,我们开发了一套基于rabbitmq的消息系统,可以实现消息的可靠传输,提供了简单的restful api, 减少业务使用rabbitmq的学习成本。

下面说下这套系统jmsg的主要组成部分,在说之前,需要首先连接数据库结构:

1.MessageConfig 发送端配置,消息->Queue映射关系

CREATE TABLE `MessageConfig` (

`ID` int(11) NOT NULL AUTO_INCREMENT,

`MessageName` varchar(200) NOT NULL,  --消息名称

`ExchangeName` varchar(200) NOT NULL, --消息名和队列的映射关系

`Priority` varchar(50) DEFAULT NULL,  -- exchange与queue之前绑定的媒介

`UseDelayRetry` bit(1) DEFAULT NULL, — 是否使用重试

`DelayTime` int(11) DEFAULT NULL,   —延迟多长时间重试

`MaxRetryCount` int(11) DEFAULT ‘3’,  —最大重试次数

`_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

PRIMARY KEY (`ID`),

UNIQUE KEY `IX_MessageName` (`MessageName`)

) ENGINE=InnoDB AUTO_INCREMENT=409 DEFAULT CHARSET=utf8;

字段解释:

MessageName: 消息名称

ExchangeName: exchange名称

Priority: 优先级,一个业务线可以根据不同优先级有多个队列

UseDelayRetry:是否使用重试

DelayTime: 延迟多长时间重试

MaxRetryCount: 最大重试次数

表数据

2.MessageConsumersConfig 表: 消费端配置,消息->接收方配置

CREATE TABLE `MessageConsumersConfig` (

`ID` int(11) NOT NULL AUTO_INCREMENT,

`MessageName` varchar(200) NOT NULL, — 消息名

`Url` varchar(400) NOT NULL,  — 消息消费的url

`TimeOut` int(11) DEFAULT ‘10’,

PRIMARY KEY (`ID`)

) ENGINE=InnoDB AUTO_INCREMENT=266 DEFAULT CHARSET=utf8;

MessageName: 消息名

Url: 消息消费url

Timeout: 消费超时时间

CREATE TABLE `QueueSetting` (

`ID` bigint(20) NOT NULL AUTO_INCREMENT,

`QueueName` varchar(50) DEFAULT NULL,

`QOS` int(11) DEFAULT NULL,

`ParallelCount` int(11) DEFAULT NULL,

`LastUpdateTime` datetime DEFAULT NULL,

`LastUpdateUserName` varchar(50) DEFAULT NULL,

PRIMARY KEY (`ID`)

) ENGINE=InnoDB AUTO_INCREMENT=51 DEFAULT CHARSET=utf8;

rabbimq 配置

serverIP : 服务器ip

Port:服务端口号

UserName: 用户名

PassWord: 密码

MaxPoolSize: 最大连接池大小

RequestedHeartbeat: 请求心跳检查时间(s)

RequestedConnectionTimeout: 请求连接超时时间

FailedLogBaseDir: 失败日志存储目录

ConnectionTimeOut: 连接保持时间(ms)

SendTimeOut: 发送超时(s)

ReceiveTimeOut: 接收超时时间(s)

SendLogBaseDir(发送日志目录)

1  jmsg-client

消息发送客户端,提供发送消息的接口

流程图:

其中比较重要的是RabbitConnectPool(单例创建连接池),该类中比较重要的属性和方法

_max: //可以创建的最大连接数

_created: 已经使用的连接数

_used: 已经使用的连接数

_sendTimeOut: 发送连接请求超时时间

_receiveTimeOut: 接收连接成功的超时时间

_clientExpires: 连接到期时间

_connectionTimeOut: 连接超时

_qos:

重要的方法:

getSendingConnection(): 获取一个发送端的连接, 如果不是强制,就从连接池中获取连接,否则强制创建一个连接

getNextProxy:() 从连接池中过去连接(返回RabbitSendProxy), 如果超过最大连接数,则创建新连接, 否则加锁获取 proxy(pollProxy),如果返回为空,这等待,直到获取连接为止

pollProxy(): 获取连接, 从proxyqueue中poll,如果连接不可用,这_created–, 然后_used++, 如果创建条数 < 最大数, 这获取新连接(newProxy(), create++, _used++

returnToPool(): 返回到连接池,

getNewProxy(): 三次重试, getProxy(), 重试间隔0.1s

getProxy(): 通过工厂模式生成连接

RabbitProxy: 客户端连接rabbitmq 代理接口,做为连接池中的单元连接代理,可以由发送端和接收端继承

主要属性:

isAvailable: 是否可用,默认true

createTime: 创建时间,

DisposeListener: 连接池关闭需要执行的接口

connectionTimeout: 连接超时时间, 当前时间-createTime <= connectionTimeout可用

receiveTimeOut: 接收超时时间

Connection: 最主要的类,com.rabbitmq.client.Connection 连接

qos: 服务器一次可以传输的消息条数

Channel : 管道,连接创建管道,进行数据传输

ConnectionFactory: 连接工厂,创建rabbitmq连接

主要操作:

isAvailable(): 连接是否可用

dispose(): 关闭连接 需要关闭channel和connection

 

RabbitSendProxy 发送端代理, 默认开通channel confirsSelect,即确认机制

send(): 发送方法

流程:转换成byte数组->检查消息长度(小于64K) -> 缓存数据,等待确认->发送(basicPublish) -> 在接收到后删除缓存数据

下面说下如何保证数据一定能发送到rabbit queue中:

为了解决发送失败的问题,解决的思路无非是消息持久化,采用文件做持久化是比较好的选择。

具体的实现是消息失败后,放入blockingqueue作为数据换出的地方,定期从queue中读取数据存储文件,开启定时任务读取数据,重新send到queue中。

2 jmsg-server

作用: 从rabbitmq中读取消息,通过http接口调用消费者

数据库如图:

jmg-server的流程:

  • 从数据库拿到该机器需要处理的queue,初始化rabbitmq连接池
  • 遍历queuelist,注册监听器,对每个queue获取的消息处理
  • 对每个queue开启MessageReceiver线程,监听该queue数据
  • messageReceiver 开启线程池,qos是线程池大小
  • messageReceiver是一个循环,不断获取rabbitmq server连接
  • 获取到数据后,开启线程进行处理MessageProcessTask, 该任务主要是查找ImessageListener的实现类,调用receive方法
  • 接收到消息处理:

消 息校验 -> 获取消息配置,找到消费者-> 判断没有正在处理 -> 消息还没有处理成功or 没有达到最大处理失败次数 – > 首次接收的消息入库- > 广播消息到接收方 -> 处理成功,记录messageLog,修改状态; 处理失败,发送到rabbitmq-server,等待下次处理.

3  rabbitmq-server

采用集群的方式搭建, 通过nginx对外提供统一的url

集群中一些重要的概念:

network partition: 网络中断,一般是子网之间的设备中断,这样在不同子网的设备通信会出现问题

搭建集群:

abbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群景象。Erlang的集群中各节点是经由过程一个magic cookie来实现的,这个cookie存放在 $home/.erlang.cookie 中(像我的root用户安装的就是放在我的root/.erlang.cookie中),文件是400的权限。所以必须包管各节点cookie对峙一致,不然节点之间就无法通信。

方案1: 普通集群

erlang 通过cookie来决定是否能和另外一个节点通信,通常的做法是在一个机器上生成cookie文件,拷贝到集群中的其他机器。

集群可以通过单逻辑broker的方式来连接多个机器。各机器间通过Erlang消息传递来通信,因此,集群内所有节点都必须有相同的Erlang cookie。集群内机器间的网络连接必须是可信的,且所有机器必须运行相同版本的Erlang和RabbitMQ。

虚拟机、交换机、用户和权限会自动镜像到集群内所有节点。队列可能位于单节点上,或者镜像到多个节点上。客户端连接到集群内任何节点都能看到集群内所有队列。

步骤

1

rabbit1$ rabbitmq-server -detached
rabbit2$ rabbitmq-server -detached
rabbit3$ rabbitmq-server -detached

2 加入以rabbit3为集群,集群名为[email protected],则需要在rabbit1和rabbit2上执行下面操作,加入[email protected],

rabbit2$ rabbitmqctl stop_app
Stopping node [email protected] ...done.
rabbit2$ rabbitmqctl join_cluster [email protected]
Clustering node [email protected] with [[email protected]] ...done.
rabbit2$ rabbitmqctl start_app
Starting node [email protected] ...done.

3 同样在rabbit3,上操作,加入[email protected]

rabbit3$ rabbitmqctl stop_app
Stopping node [email protected] ...done.
rabbit3$ rabbitmqctl join_cluster [email protected]
Clustering node [email protected] with [email protected] ...done.
rabbit3$ rabbitmqctl start_app
Starting node [email protected] ...done.

方案2:镜像队列

上述配置的RabbitMQ默认集群模式,但并不包管队列的高可用性,尽管互换机、绑定这些可以复制到 集群里的任何一个节点,然则队列内容不会复 制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内 容到集群里的每个节点,须要创建镜像队列

Federation允许一个broker上的交换机接收发布到另一个broker(这个broker可能是单独的机器或者集群)上的交换机的消息。为了节点间能够通过AMQP(带上SSL选项)通信,组成federation的两个交换机之间必须授予适当的用户和权限。

组成federation的交换机之间通过单向点对点连接。缺省情况下,在federation连接上,消息仅仅被转发一次,但是这样可增加更多、更复杂的路由拓扑。

在federation连接上,有些消息可能不会被转发;如果一条消息到达federated交换机后不能被路由到某个队列,则它不会被转发。

你可以在Internet上通过federation连接各个broker来pub/sub消息。

方案3: shovel

相比federation,工作在更低一层,shovel简单从一个broker的一个queue中消费消息,并传递到下一个broker的exchange上

the shovel simply consumes messages from a queue on one broker, and forwards them to an exchange on another.

参考资料:

http://lynnkong.iteye.com/blog/1699684

http://blog.chinaunix.net/topic/surpershi/

http://www.rabbitmq.com/documentation.html

时间: 2024-10-17 08:04:53

rabbitmq在艺龙业务系统中的实践的相关文章

实现业务系统中的用户权限管理--实现篇

在设计篇中,我们已经为大家阐述了有关权限管理系统的数据库设计,在本篇中,我们将重点放在其实现代码部分.为了让你能够更直接更有效的看到全部动作的代码,我们使用"动作分解列表"的方式来陈述每个动作以及相关资源. 实现权限管理功能的动作 动作分解 动作名 相关表名 操作集类型 (S,U,I,D,SQL) 表单 模组 字符资源 是否分页? 返回提示? 权限检测 权限初始化安装 setup 无 无 无 setup setupok 否 否 否 显示添加管理组界面 addnewgroup 无 无 a

实现业务系统中的用户权限管理--设计篇

B/S系统中的权限比C/S中的更显的重要,C/S系统由于具有特殊的client,所以訪问用户的权限检測能够通过client实现或通过client+server检測实现,而B/S中,浏览器是每一台计算机都已具备的,假设不建立一个完整的权限检測,那么一个"非法用户"非常可能就能通过浏览器轻易訪问到B/S系统中的全部功能.因此B/S业务系统都须要有一个或多个权限系统来实现訪问权限检測,让经过授权的用户能够正常合法的使用已授权功能,而对那些未经授权的"非法用户"将会将他们彻

[转]实现业务系统中的用户权限管理--设计篇

  实现业务系统中的用户权限管理--设计篇 B/S系统中的权限比C/S中的更显的重要,C/S系统因为具有特殊的客户端,所以访问用户的权限检测可以通过客户端实现或通过客户端+服务器检测实现,而B/S中,浏览器是每一台计算机都已具备的,如果不建立一个完整的权限检测,那么一个“非法用户”很可能就能通过浏览器轻易访问到B/S系统中的所有功能.因此B/S业务系统都需要有一个或多个权限系统来实现访问权限检测,让经过授权的用户可以正常合法的使用已授权功能,而对那些未经授权的“非法用户”将会将他们彻底的“拒之门

聊聊业务系统中投递消息到mq的几种方式

背景 电商中有这样的一个场景: 下单成功之后送积分的操作,我们使用mq来实现 下单成功之后,投递一条消息到mq,积分系统消费消息,给用户增加积分 我们主要讨论一下,下单及投递消息到mq的操作,如何实现?每种方式优缺点? 方式一 step1:start transaction step2:生成订单 step3:投递消息到mq step4:commit transaction 这种方式是将发送消息放在了事务提交之前,可能存在的问题: step3发生异常 导致step4失败,下单失败,直接影响到下单业

业务系统中的开与闭——分发模式

"对新增开放,对修改关闭."--开闭原则. 这里分享一个我在业务系统设计过程中常用的一个"复合模式",用作一个在业务系统设计中运用"开闭原则"的例子. 背景 这是一个账务系统,负责处理各类业务流程中发生的若干个账户之间的转账相关逻辑,包括账户余额的变更.以及各账户的流水记录.这个系统的复杂度在于:不同的业务流程,所需要操作的账户.金额的计算公式.以及流水的类型,都有很大的差异:即使是同一个业务,里面也会细分为多个子业务,账户.金额.流水类型又各不

BPM平台在企业业务系统中使用的价值讨论

1. 企业应用系统的分类以及BPM这种平台分别在这两种不同类型系统中的作用 因为在系统建设之前,我们要分清楚,准备建设的系统属于那一类 ,目前企业应用系统大体我觉得可以分以下几类 A)  垂直的业务系统   其特点,是业务一旦固化,其功能需求或流程需求变化较少,其功能大多为特定的部门或组织使用,如未来建设的PMT系统就属于此一类, 流程平台只能在技术架构层面提供扩展和底层服务,系统需要先从业务视角创建业务模型对象,活动对象,和其他模型对象,在最后的技术架构层才能体现其价值.诸如HR,CRM,ER

以Drools5.5为例说明“规则引擎在业务系统中应用”---规则引擎与业务系统交互

一.重要概念 Fact:是指在Drools规则应用当中,将一个普通的JavaBean插入到规则的WorkingMemory当中后的对象. 规则可以对Fact对象进行任意的读写操作,当一个JavaBean插入到WorkingMemory当中变成Fact之后,Fact 对象不是对原来的JavaBean对象进行Clon,而是原来JavaBean对象的引用.规则在进行计算的时候需要用到应用系统当中的数据,这些数 据设置在Fact对象当中,然后将其插入到规则的WorkingMemory当中,这样在规则当中

艺龙私有化,携程与腾讯眉来眼去

艺龙宣布,董事会收到腾讯控股的私有化要约,将以18美元的价格以收购艺龙发行的除了携程.铂涛和腾讯等艺龙股东外的全部流通股.如果这个交易能够完成,艺龙将完成私有化从美国纳斯达克股票市场退市,然后寻求在国内资本市场再度上市. 至于艺龙再次卖身给腾讯的原因是,艺龙表示因为竞争对手"看到中国移动住宿市场的巨大潜力,不惜以每年亏损十多亿元人民币的烧钱速度在这个市场里疯狂扩张,艺龙因此也面临着巨大的竞争压力."看上去是在控诉对手的以钱压人. 其实不说在线旅游行业,整个互联网都是在花钱买时间抢占市场

业务系统数据库设计常见的隔离和共享模式

多年开发和维护某些业务系统的经验,让人真正理解了什么叫“数据库设计良好,系统就成功了一半”,尤其是那些面向多商户的基础服务平台.公共服务平台.开放服务平台.或者由它们组合而成的综合服务平台.数据库设计之初,必须对业务系统DB的隔离和共享模式的优缺有充分的调研,平衡好业务系统的边界,合理设计使用必要的冗余,以适应系统后续的不断变化,否则后期开发人员将陷入无尽的烦恼和痛苦之中,这绝不是危言耸听,只有开发和维护过平台类产品的人才能深刻体会.下面就介绍三种业务系统中最常见的数据库设计的隔离和共享模式: