JMS异步消息解决分布式应用的EhCache缓存同步问题

上篇博客中讲到了怎样用拦截器给用EJB发布的WebService添加缓存,这样可以提高WebService的响应效率。可是即使是这样做,还是要经历网络的传输的。于是决定在调用WebService的程序本地也添加EJB方法缓存,如果WebService调用的结果已经存在于本地缓存中,就直接从内存中拿数据,不用再访问WebService了。

架构图如下所示

但是另一个问题又出现了,那就是WebService中的缓存和客户程序本地缓存的同步问题,这个问题可以具体描述如下:

当提供WebService的程序的数据库中的数据发生改变后(程序执行了增删改方法后),就需要将WebService的缓存清空,因为那些是脏数据。可是调用WebService的客户程序本地的缓存却没有清空。

怎样解决这个问题呢?怎样才能清空WebService缓存的同时也清空调用客户端本地的缓存呢?利用JMS的消息机制就可以解决这一问题

具体思路

在WebService服务端创建一个JMS
Topic,起名CacheTopic

当服务端执行增删改方法后,向CacheTopic中发一条消息

客户程序在自己的服务器中部署Message Driven Bean,监听CacheTopic中的消息,收到消息后清空本地缓存

架构图如下所示

项目中使用的AS都是JBoss,在JBoss中添加JMS
Topic的方法是在deploy目录下部署一个Destination描述文件,文件名符合*-service.xml。

本项目中使用的CacheTopic的部署文件内容如下

<?xml version="1.0" encoding="UTF-8"?>

<server>

   <!--使用jboss messaging定义topic-->
   <mbean code="org.jboss.jms.server.destination.TopicService"
      name="jboss.messaging.destination:service=Topic,name=CacheTopic"
      xmbean-dd="xmdesc/Topic-xmbean.xml">
      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
      <depends>jboss.messaging:service=PostOffice</depends>
	  <attribute name="JNDIName" >topic/CacheTopic</attribute>
   </mbean>     

</server>

服务端程序在执行增删改方法后,不仅要清除WebService中的缓存,还要向CacheTopic中发送消息

上篇博客中的拦截器修改如下(主要是添加了发送消息的功能):

public class CacheClearSyncInterceptor {
	@AroundInvoke
	public Object clearCache(InvocationContext context) throws Exception{

		//执行目标方法
		Object returnObj =context.proceed();

		/**************************清空本地缓存  begin**************************************/
		System.out.println("清空前的缓存数:"+CacheHandler.getInstance().getCache().getSize());
		//清空本地缓存
		CacheHandler.getInstance().clearCache();
		System.out.println("清空后的缓存数:"+CacheHandler.getInstance().getCache().getSize());
		/**************************清空本地缓存  end**************************************/

		//发送消息到CacheTopic,实现缓存同步
		StringBuilder txtMsgBuilder = new StringBuilder();
		txtMsgBuilder.append("【gxpt-jc】系统执行了【")
								.append(context.getTarget().getClass().getName())
								.append(".")
								.append(context.getMethod().getName())
								.append("】")
								.append("方法,需要同步缓存");
		MessageSender.send(txtMsgBuilder.toString(), DestinationType.TOPIC,"topic/CacheTopic","192.168.24.48:1199");

		return returnObj;
	}

}

上面用到的消息发送者类MessageSender的代码如下

public class MessageSender {

	/**
	 * @MethodName	: send
	 * @Description	: 发送消息
	 * @param msg	消息
	 * @param type	目的地类型:TOPIC或QUEUE
	 * @param destinationJndi	目的地的jndi名称
	 * @param url	目的地url
	 */
	public static void send(String msg,DestinationType type,String destinationJndi,String url) throws Exception{

		//定义连接对象和session
		TopicConnection topicConnection=null;
		TopicSession topicSession = null;
		QueueConnection queueConnection=null;
		QueueSession queueSession = null;

		try {
			//创建context
			Properties props = new Properties();
			props.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
			props.setProperty("java.naming.provider.url", url);
			Context ctx = new InitialContext(props);

			/************************************发消息给TOPIC  begin******************************************************/
			if(type==DestinationType.TOPIC){
				TopicConnectionFactory topicFactory=(TopicConnectionFactory)ctx.lookup("ConnectionFactory");

				//获取Connection
				topicConnection=topicFactory.createTopicConnection();

				//获取Session
				topicSession=topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

				//获取destination
				Topic topic=(Topic)ctx.lookup(destinationJndi);

				//创建消息发送者
				TopicPublisher publisher=topicSession.createPublisher(topic);

				//创建消息
				TextMessage txtMsg = topicSession.createTextMessage(msg);
				//发送消息
				publisher.publish(txtMsg);

			}
			/************************************发消息给TOPIC  end******************************************************/

			/************************************发消息给QUEUE  begin******************************************************/
			if(type==DestinationType.QUEUE){
				QueueConnectionFactory queueFactory=(QueueConnectionFactory)ctx.lookup("ConnectionFactory");

				//获取Connection
				queueConnection=queueFactory.createQueueConnection();

				//获取Session
				queueSession=queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

				//获取destination
				Queue queue=(Queue)ctx.lookup(destinationJndi);

				//创建消息发送者
				QueueSender sender=queueSession.createSender(queue);

				//创建消息
				TextMessage txtMsg = queueSession.createTextMessage(msg);
				//发送消息
				sender.send(txtMsg);
			}
			/************************************发消息给QUEUE  end******************************************************/

		} finally{
			//关闭对象
			if(topicConnection!=null && topicSession!=null){
				topicSession.close();
				topicConnection.close();
			}

			if(queueConnection!=null && queueSession!=null){
				queueSession.close();
				queueConnection.close();
			}
		}

	}

}

客户端接收消息的MDB的代码如下

@MessageDriven(
		activationConfig={
				@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
				@ActivationConfigProperty(propertyName="destination",propertyValue="topic/CacheTopic"),
				@ActivationConfigProperty(propertyName="providerAdapterJNDI", propertyValue="java:/RemoteJMSProvider")
		}
)
public class KSCacheSyncMdb implements MessageListener{

	public void onMessage(Message msg){
		try {
			//获取消息文本
			TextMessage txtMsg = (TextMessage)msg;
			//显示文本消息
			System.out.println("由于"+txtMsg.getText());

			/**************************清空本地缓存  begin**************************************/
			System.out.println("清空前的缓存数:"+CacheHandler.getInstance().getCache().getSize());
			//清空本地缓存
			CacheHandler.getInstance().clearCache();
			System.out.println("清空后的缓存数:"+CacheHandler.getInstance().getCache().getSize());
			/**************************清空本地缓存  end**************************************/

		} catch (Exception e) {
			e.printStackTrace();
			throw new RuntimeException(e.getMessage());
		}
	}

}

因为在JBoss5.1.0中部署的MDB默认只能监听本地Destination中的消息,为了让MDB可以监听远程Destination中的消息,客户端仍需再部署一个RemoteJMSProvider描述文件,文件名同样需符合*-service.xml。文件内容如下

<?xml version="1.0" encoding="UTF-8"?>
<server>
	<mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider">
		<attribute name="ProviderName">RemoteJMSProvider</attribute>
		<attribute name="ProviderAdapterClass">
		  org.jboss.jms.jndi.JNDIProviderAdapter
		</attribute>
		<!-- The combined connection factory -->
		<attribute name="FactoryRef">XAConnectionFactory</attribute>
		<!-- The queue connection factory -->
		<attribute name="QueueFactoryRef">XAConnectionFactory</attribute>
		<!-- The topic factory -->
		<attribute name="TopicFactoryRef">XAConnectionFactory</attribute>
		<!-- Uncomment to use HAJNDI to access JMS-->
		<attribute name="Properties">
		   java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
		   java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
		   java.naming.provider.url=192.168.24.48:1199
		</attribute>
	</mbean>

</server> 

这样就实现了分布式应用中的缓存同步

JMS异步消息解决分布式应用的EhCache缓存同步问题,码迷,mamicode.com

时间: 2024-11-09 09:31:11

JMS异步消息解决分布式应用的EhCache缓存同步问题的相关文章

我们究竟什么时候可以使用Ehcache缓存(转)

一.Ehcache是什么 EhCache是Hibernate的二级缓存技术之一,可以把查询出来的数据存储在内存或者磁盘,节省下次同样查询语句再次查询数据库,大幅减轻数据库压力. 二.Ehcache的使用场景是什么 1.首先最主要就是页面缓存. 网站页面的数据来源非常广泛的,大多数来自不同的对象,而且有可能来自不同的db,所以给页面做缓存是一个不错的主意. 2.常用数据的缓存一些配置信息,如后台的某些不经常改变的设置都可以缓存起来. 三.Ehcache使用的注意点 1.比较少的更新数据表的情况2.

我们究竟什么时候可以使用Ehcache缓存

文/小程故事多(简书作者) 原文链接:http://www.jianshu.com/p/2cd6ad416a5a 一.Ehcache是什么 EhCache是Hibernate的二级缓存技术之一,可以把查询出来的数据存储在内存或者磁盘,节省下次同样查询语句再次查询数据库,大幅减轻数据库压力. 二.Ehcache的使用场景是什么 1.首先最主要就是页面缓存. 网站页面的数据来源非常广泛的,大多数来自不同的对象,而且有可能来自不同的db,所以给页面做缓存是一个不错的主意. 2.常用数据的缓存 一些配置

深入探讨在集群环境中使用 EhCache 缓存系统

EhCache 缓存系统简介 EhCache 是一个纯 Java 的进程内缓存框架,具有快速.精干等特点,是 Hibernate 中默认的 CacheProvider. 下图是 EhCache 在应用程序中的位置: 图 1. EhCache 应用架构图 EhCache 的主要特性有: 快速: 简单: 多种缓存策略: 缓存数据有两级:内存和磁盘,因此无需担心容量问题: 缓存数据会在虚拟机重启的过程中写入磁盘: 可以通过 RMI.可插入 API 等方式进行分布式缓存: 具有缓存和缓存管理器的侦听接口

集群环境中使用 EhCache 缓存系统

EhCache 缓存系统 : 本章节将要介绍EhCache及EhCache实现分布式的一些解决方案.并针对于这些解决性方案做一个实现,后续将出一个提供项目模块化.服务化.插件化的VieMall快速开发平台,同时集成Dubbo服务化.Zookeeper(分布式调度/分布式配置管理服务).Redis分布式缓存技术及Memcache/Ehcache 二级缓存切换.FastDFS分布式文件系统.ActiveMQ异步消息中间件.Solr搜索.Nginx负载均衡等分布式及读写分离.如果有时间可以深入分表分库

EhCache缓存在集群环境中同步问题

由于 EhCache 是进程中的缓存系统,一旦将应用部署在集群环境中,当每一个节点维护各自的缓存数据,某个节点对缓存数据进行更新,这些更新的数据无法在其它节点中共享,这不仅会降低节点运行的效率,而且会导致数据不同步的情况发生.例如某个网站采用 A.B 两个节点作为集群部署,当 A 节点的缓存更新后,而 B 节点缓存尚未更新就可能出现用户在浏览页面的时候,一会是更新后的数据,一会是尚未更新的数据,尽管我们也可以通过 Session Sticky 技术来将用户锁定在某个节点上,但对于一些交互性比较强

ehcache缓存技术的特性

Ehcache是现在最流行的纯Java开源缓存框架,配置简单.结构清晰.功能强大,最初知道它,是从hibernate的缓存开始的.网上中文的EhCache材料以简单介绍和配置方法居多,如果你有这方面的问题,请自行google:对于API,官网上介绍已经非常清楚,请参见官网:但是很少见到特性说明和对实现原理的分析,因此在这篇文章里面,我会详细介绍和分析EhCache的特性,加上一些自己的理解和思考,希望对缓存感兴趣的朋友有所收获. 一.特性一览,来自官网,简单翻译一下: 1.快速轻量 过去几年,诸

JMS(Java消息服务)入门教程

阅读目录 什么是Java消息服务 为什么需要JMS JMS的优势 JMS消息传送模型 接收消息 JMS编程接口 JMS消息结构 JMS使用示例 译文链接(做了部分修改~~) 什么是Java消息服务 Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建.发送.读取消息等,用于支持JAVA应用程序开发.在J2EE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果,我

微信后台异步消息队列的优化升级实践分享

1.引言 MQ 异步消息队列是微信后台自研的重要组件,广泛应用在各种业务场景中,为业务提供解耦.缓冲.异步化等能力.本文分享了该组件2.0版本的功能特点及优化实践,希望能为类似业务(比如移动端IM系统等)的消息队列设计提供一定的参考. 2.关于分享者 廖文鑫,2013年加入腾讯,从事微信后台基础功能及架构的开发和运营,先后参与了消息通知推送系统.任务队列组件.春晚摇红包活动等项目,在海量分布式高性能系统方面有丰富的经验. 3.背景介绍 微信后台给件 MQ 1.0 发布之初,基本满足了一般业务场景

Android 异步消息处理机制前篇(二):深入理解Message消息池

上一篇中共同探讨了ThreadLocal,这篇我们一起看下常提到的Message消息池到底是怎么回事,废话少说吧,进入正题. 对于稍有经验的开发人员来说我们在使用Handler发送异步消息获取Message的时候都会使用如下代码获取一个Message对象: 1 Message msg = mHandler.obtainMessage(); 而不是直接new一个: 1 Message msg = new Message(); 二者的主要区别就是上面的用到缓存池概念,如果池中有闲着的则拿来用,没有则