今天就来说下 这个项目中使用ActiveMQ的情况, MQ: message queue, 顾名思义就是消息队列的意思.
一: 使用场景:
消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有这深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧。在使用队列后,用户的请求发给队列后立即返回(当然不能直接给用户提示订单提交成功,京东上提示:您“您提交了订单,请等待系统确认”),再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。
那么在babasport这个项目中, 我们可以在上架的时候使用消息队列的模式:
我们之前在点击一款商品上架的时候, 我们需要分成2步, 第一: 更新商品表中该商品的上架状态. 第二: 将该商品信息保存到Solr服务器中. 那么如果我们使用了消息队列后, 第二步就可以使用发送message来异步完成.
消息队列可以接收消息和 发送消息
消息队列类型:
队列:一对一聊天 私聊 QQ
主题(订阅模式):一对多聊天 群聊 QQ
名词解释:
二, 代码原型
ActiveMQ需要部署到Linux系统下, 这里就不再做概述.
这里也是tar包, 导入到linux下直接解压启动即可, 前面已经有过很多博文讲Linux下一些常用软件的安装步骤.
上架代码原型:
项目构件图:
未使用ActiveMQ前ProductServiceImpl.cs:
1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架状态 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //这个地方的代码应该在babasport-solr中写, 现在使用ActiveMQ进行迁移. 11 //TODO 保存商品信息到Solr服务器 12 SolrInputDocument doc = new SolrInputDocument(); 13 //ID 14 doc.setField("id", id); 15 //名称 16 Product p = productDao.selectByPrimaryKey(id); 17 doc.setField("name_ik", p.getName()); 18 //图片URL 19 doc.setField("url", p.getImgUrls()[0]); 20 //品牌 ID 21 doc.setField("brandId", p.getBrandId()); 22 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1 23 SkuQuery skuQuery = new SkuQuery(); 24 skuQuery.createCriteria().andProductIdEqualTo(id); 25 skuQuery.setOrderByClause("price asc"); 26 skuQuery.setPageNo(1); 27 skuQuery.setPageSize(1); 28 List<Sku> skus = skuDao.selectByExample(skuQuery); 29 doc.setField("price", skus.get(0).getPrice()); 30 //...时间等 剩下的省略 31 32 try { 33 solrServer.add(doc); 34 solrServer.commit(); 35 } catch (Exception e) { 36 // TODO Auto-generated catch block 37 e.printStackTrace(); 38 } 39 40 41 42 43 //TODO 静态化 44 } 45 }
上面的代码 除了更改本来就该更改的商品状态信息外, 还去见商品信息保存到了Solr服务器中了. 这里我们使用ActiveMQ进行改造:
使用ActiveMQ后的ProductServiceImpl.cs:
1 //上架 2 public void isShow(Long[] ids){ 3 Product product = new Product(); 4 product.setIsShow(true); 5 for (final Long id : ids) { 6 //上下架状态 7 product.setId(id); 8 productDao.updateByPrimaryKeySelective(product); 9 10 //发送商品ID到ActiveMQ即可. 11 jmsTemplate.send(new MessageCreator() { 12 13 @Override 14 public Message createMessage(Session session) throws JMSException { 15 16 return session.createTextMessage(String.valueOf(id)); 17 } 18 }); 19 20 //TODO 静态化 21 } 22 }
接着就是配置消息发送方(JMS生产者) mq.xml:
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop" 5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task-4.0.xsd 20 http://code.alibabatech.com/schema/dubbo 21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 22 23 24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ--> 25 <!-- 连接工厂, 此工厂由Apache提供 --> 26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 27 <!-- 连接地址 28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616 29 --> 30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 31 <!-- 设置用户名及密码 --> 32 <property name="userName" value="admin"></property> 33 <property name="password" value="admin"></property> 34 </bean> 35 36 <!-- 配置连接池管理工厂 --> 37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> 38 <!-- 注入工厂 --> 39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property> 40 <!-- 设置最大连接数 --> 41 <property name="maxConnections" value="5"></property> 42 </bean> 43 44 <!-- 把上面的工厂交给Spring管理 --> 45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 46 <!-- 注入上面的工厂 --> 47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property> 48 </bean> 49 50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ --> 51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 52 <!-- 注入Spring单例工厂 --> 53 <property name="connectionFactory" ref="singleConnectionFactory"></property> 54 <!-- 设置默认的目标管道 --> 55 <property name="defaultDestinationName" value="pId"/> 56 </bean> 57 </beans>
配置说明: 这里是首先构建一个MQ的连接工厂, 只要ActiveMQ启动后 就可以这样构建连接了. 配置登录的用户名和和密码.
接着就是配置连接池, 把连接工厂交给连接池去管理. 这些都是Apache厂商提供的.
接着就是再将连接池交由Spring管理.
最后我们再来配置一个jmsTemplate模板来操作ActiveMQ, 这个类似于jdbcTemplate模板. 而且我们这个里面注入了一个默认的管道, 也就是productId, 因为我们现在是 传递消息一一去对应, 关于怎么对应 就是依赖于这个管道.
接下来我们就看下消息的接收方(JMS消费者)的一些东西:
消费者的目录结构:(Solr)
Solr项目中的ActiveMQ配置文件mq.xml:
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:aop="http://www.springframework.org/schema/aop" 5 xmlns:tx="http://www.springframework.org/schema/tx" 6 xmlns:task="http://www.springframework.org/schema/task" 7 xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 8 xsi:schemaLocation="http://www.springframework.org/schema/beans 9 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 10 http://www.springframework.org/schema/mvc 11 http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 12 http://www.springframework.org/schema/context 13 http://www.springframework.org/schema/context/spring-context-4.0.xsd 14 http://www.springframework.org/schema/aop 15 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 16 http://www.springframework.org/schema/tx 17 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 18 http://www.springframework.org/schema/task 19 http://www.springframework.org/schema/task/spring-task-4.0.xsd 20 http://code.alibabatech.com/schema/dubbo 21 http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 22 23 24 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ--> 25 <!-- 连接工厂, 此工厂由Apache提供 --> 26 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 27 <!-- 连接地址 28 在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616 29 --> 30 <property name="brokerURL" value="tcp://192.168.200.128:61616"/> 31 <!-- 设置用户名及密码 --> 32 <property name="userName" value="admin"></property> 33 <property name="password" value="admin"></property> 34 </bean> 35 36 <!-- 配置连接池管理工厂 ,由Apache提供.--> 37 <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean"> 38 <!-- 注入工厂 --> 39 <property name="connectionFactory" ref="activeMQConnectionFactory"></property> 40 <!-- 设置最大连接数 --> 41 <property name="maxConnections" value="5"></property> 42 </bean> 43 44 <!-- 把上面的工厂交给Spring管理 --> 45 <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 46 <!-- 注入上面的工厂 --> 47 <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property> 48 </bean> 49 50 <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ --> 51 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 52 <!-- 注入Spring单例工厂 --> 53 <property name="connectionFactory" ref="singleConnectionFactory"></property> 54 <!-- 设置默认的目标管道 --> 55 <property name="defaultDestinationName" value="pId"/> 56 </bean> 57 58 <!-- 实例化一个监听到消息后 处理此消息的类 --> 59 <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/> 60 61 <!-- 配置实时监听器 --> 62 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 63 <!-- 配置工厂, 需要配置spirng的工厂 --> 64 <property name="connectionFactory" ref="singleConnectionFactory"/> 65 <!-- 设置监听的目标 --> 66 <property name="destinationName" value="pId"/> 67 <!-- 监听后获取消息的类, 接收监听到的消息 --> 68 <property name="messageListener" ref="customMessageListener"></property> 69 </bean> 70 </beans>
我们来说下 和上面配置不同的地方, 我们在这里配置了一个监听器, 因为接收到 JMS 生产者发过来的消息后我们需要有个监听器去监听且 将监听到的消息拿过来处理.
接下来看看监听器的处理方法做了些什么事情:
CustomMessageListener.java:
1 /* 2 * 接收MQ中的消息 3 */ 4 public class CustomMessageListener implements MessageListener{ 5 @Autowired 6 private SearchService searchService; 7 8 @Override 9 public void onMessage(Message message) { 10 //先将接收到的消息强转为ActiveMQ类型的消息 11 //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage 12 ActiveMQTextMessage amtm = (ActiveMQTextMessage)message; 13 try { 14 String id = amtm.getText(); 15 System.out.println("接收到的ID:"+id); 16 searchService.insertProductToSolr(Long.parseLong(id)); 17 } catch (JMSException e) { 18 // TODO Auto-generated catch block 19 e.printStackTrace(); 20 } 21 }
因为我们接收到的是string类型的文本, 所以这里我们直接将接收到的消息转换为ActiveMQText类型, 然后通过getText去得到传递过来的id, 然后我们就可以通过这个productId去做相应的操作了.
接下来就看保存商品信息到Solr服务器的逻辑:
SearchServiceImpl.java:
1 //保存商品信息到Solr服务器中, 通过ActiveMQ 2 public void insertProductToSolr(Long productId){ 3 //TODO 保存商品信息到Solr服务器 4 SolrInputDocument doc = new SolrInputDocument(); 5 //ID 6 doc.setField("id", productId); 7 //名称 8 Product p = productDao.selectByPrimaryKey(productId); 9 doc.setField("name_ik", p.getName()); 10 //图片URL 11 doc.setField("url", p.getImgUrls()[0]); 12 //品牌 ID 13 doc.setField("brandId", p.getBrandId()); 14 //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1 15 SkuQuery skuQuery = new SkuQuery(); 16 skuQuery.createCriteria().andProductIdEqualTo(productId); 17 skuQuery.setOrderByClause("price asc"); 18 skuQuery.setPageNo(1); 19 skuQuery.setPageSize(1); 20 List<Sku> skus = skuDao.selectByExample(skuQuery); 21 doc.setField("price", skus.get(0).getPrice()); 22 //...时间等 剩下的省略 23 24 try { 25 solrServer.add(doc); 26 solrServer.commit(); 27 } catch (Exception e) { 28 // TODO Auto-generated catch block 29 e.printStackTrace(); 30 } 31 }
这样就比较明朗了, ActiveMQ 队列就是这样来实现的.
====================接下来还会有 ActiveMQ 订阅者模式的示例, 这里只是生产者发送消息给单个消费者, 下次还会更新生产者发送消息给多个消费者.