Java异步消息平台

JAVA平台异步消息模块

JAVA平台异步消息模块,是一个针对RabbitMQ的消息发送及处理封装,包含消息的配置、发送、接收、失败重试、日志记录等,总共分为4个部分:

1)RabbitMQ访问封装:JAMQP(Jar包)

2)消息模块公共对象、配置读取及接口定义:JMSG(Jar包)

3)消息发送端:JMSG—Client(Jar包)

4)消息接收端:JMSG—Server(War包)

 

RabbitMQ简介

MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是 AMQP的标准实现。

RabbitMQ的结构图如下:

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Alternate exchange:发送给某Exchange的消息路由失败时,发送至该exchange。

Dead letter exchange:死信Exchange,将超过一定时间的Queue中的消息发送至该exchange。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来。

Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

RabbitMQ封装

1) RabbitConfig :所有RabbitMQ有关配置

2) RabbitConnectPool :RabbitMQ连接池,管理所有对RabbitMQ的连接

3) RabbitProxy :RabbitMQ连接的封装,包含

4) RabbitSendProxy : 专门用于发送消息的连接,继承自RabbitProxy

5) RabbitReceiveProxy :专门用于接收消息的连接,继承自RabbitProxy

6) RabbitReceiverDispatcher : 消息分发器管理类,用于消息的接

7) 消息发送:从连接池RabbitConnectPool中Get出RabbitSendProxy,调用send方法发送消息,return获得的RabbitSendProxy。发送失败的消息记录本地文件,由轮询线程获取消息后重试3次

8) 消息接收:服务启动时向RabbitReceiverDispatcher注册要监听的队列,每一个监听对应一个线程以及一个处理消息的线程池,从RabbitMQ获取到推送来的消息后,通过线程池并行处理

 

内部流程

消息发送逻辑

消息接收逻辑

配置说明

RabbitMQ配置表

消息配置表

1) MessageName : 消息名称,MessageConfig表中唯一

2) Url : 消息的业务处理Http API地址

3) Priority : 消息优先级,必须写成P1,P2,P3....P8,P9,用于RabbitMQ的RouteKey,发送消息时的最终RouteKey = MessageName.Priority

 

使用方法

发送消息

消息发送方法定义:

/**

* 发送消息

* @param transferObject

*  消息主体对象

* @param customTag

*  自定义消息标签

* @param messageName

*  消息名称

*/

void send(Object transferObject, String customTag, String messageName);

在消息配置表中配置好消息的信息,引用JMSG和JMSG-Client 两个Jar包,调用方法如下:

public void SendMsg() {

RabbitMQSender sender = new RabbitMQSender();

sender.send(new Object(), "tag", "messageName");

}

注意:传输的对象(transferObject)进行JSON序列化后,大小不能超过64K,否则会抛出异常

接收消息

1) 实现一个能够接收Post请求的Http API,Post请求参数形式如下:

NameValuePair[] data = {

new NameValuePair("message", message.getTransferObjectJSON()),

new NameValuePair("messageName", message.getMessageName()),

new NameValuePair("tag", message.getCustomTag()),

};

2) 该API的返回值要求为JSON串,内容要求如下(responseCode:0表示处理成功,小于0表示系统异常,大于0表示业务异常):

"{exceptionMessgage:null, responseCode:0}"

3) 将该API的Url配置到MessageConsumersConfig表中

下面是一个简单的实例:

@Controller

public class ConsumerController {

private static final Logger logger = Logger.getLogger(ConsumerController.class);

@RequestMapping(value = "testmsgconsumer", method=RequestMethod.POST)

@ResponseBody

public String LogMessage(HttpServletRequest request) {

logger.info(request.getParameter("messageName") + "(" + request.getParameter("tag") + "): " + request.getParameter("message"));

return "{exceptionMessgage:null, responseCode:0}";

}

}

 

时间: 2024-10-09 06:02:56

Java异步消息平台的相关文章

java模拟异步消息的发送与回调

http://kt8668.iteye.com/blog/205739 本文的目的并不是介绍使用的什么技术,而是重点阐述其实现原理. 一. 异步和同步 讲通俗点,异步就是不需要等当前执行的动作完成,就可以继续执行后面的动作. 通常一个程序执行的顺序是:从上到下,依次执行.后面的动作必须等前面动作执行完成以后方可执行.这就是和异步相对的一个概念——同步. 案例: A.张三打电话给李四,让李四帮忙写份材料. B.李四接到电话的时候,手上有自己的工作要处理,但他答应张三,忙完手上的工作后马上帮张三写好

Java微信公众平台开发(三)--接收消息的分类及实体的创建

转自:http://www.cuiyongzhi.com/post/41.html 前面一篇有说道应用服务器和腾讯服务器是通过消息进行通讯的,并简单介绍了微信端post的消息类型,这里我们将建立消息实体以方便我们后面的使用! (一)消息实体基础类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45

Java异步NIO框架Netty实现高性能高并发

1. 背景 1.1. 惊人的性能数据 近期一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步堵塞IO)的通信框架.性能提升了8倍多. 其实,我对这个数据并不感到吃惊,依据我5年多的NIO编程经验.通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是全然有可能的. 以下我们就一起来看下Ne

白龙卫士+异步消息的理解

白龙卫士+异步消息的理解 手机卫士 MobileSafe MobileSafe 1.0 Splash界面的(设置一个渐变动画) 只加载计入界面,–>主页 根据功能模块划分 Activity com.itheima.mobilesafe.activty 后台服务 com.itheima.mobilesafe.service 广播接受者 com.itheima.mobilesafe.receiver 数据库 com.itheima.mobilesafe.db.dao 对象(java bean) co

拨云见日---android异步消息机制源码分析

做过windows GUI的同学应该清楚,一般的GUI操作都是基于消息机制的,应用程序维护一个消息队列,开发人员编写对应事件的回调函数就能实现我们想要的操作 其实android系统也和windows GUI一样,也是基于消息机制,今天让我们通过源码来揭开android消息机制的神秘面纱 谈起异步消息,就不能不提及Handler,在安卓中,由于主线程中不能做耗时操作,所以耗时操作必须让子线程执行,而且只能在主线程(即UI线程)中执行UI更新操作,通过Handler发送异步消息,我们就能更新UI,一

C# socket 实现消息中心向消息平台 转发消息 (修改)

using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; using Jinher.AMP.SNS.Chat.Client; usin

C# socket 实现消息中心向消息平台 转发消息

公司用到,直接粘代码了 using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; using Jinher.AMP.SNS.Chat.

JMS异步消息解决分布式应用的EhCache缓存同步问题

上篇博客中讲到了怎样用拦截器给用EJB发布的WebService添加缓存,这样可以提高WebService的响应效率.可是即使是这样做,还是要经历网络的传输的.于是决定在调用WebService的程序本地也添加EJB方法缓存,如果WebService调用的结果已经存在于本地缓存中,就直接从内存中拿数据,不用再访问WebService了. 架构图如下所示 但是另一个问题又出现了,那就是WebService中的缓存和客户程序本地缓存的同步问题,这个问题可以具体描述如下: 当提供WebService的

Java微信公众平台开发_02_启用服务器配置

一.准备阶段 需要准备事项: 1.一个能在公网上访问的项目: 见:[  Java微信公众平台开发_01_本地服务器映射外网  ] 2.一个微信公众平台账号: 去注册:(https://mp.weixin.qq.com/) 3.策略文件 见:[ Java企业微信开发_Exception_02_java.security.InvalidKeyException: Illegal key size ] 4.微信官方消息加解密工具包 需要下载微信官方的消息加解密的工具包,主要是AES加密工具 下载地址: