[项目构建 八]babasport ActiveMQ的介绍及使用实例.

今天就来说下 这个项目中使用ActiveMQ的情况, MQ: message queue, 顾名思义就是消息队列的意思.

一: 使用场景: 

消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有这深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧。在使用队列后,用户的请求发给队列后立即返回(当然不能直接给用户提示订单提交成功,京东上提示:您“您提交了订单,请等待系统确认”),再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。

那么在babasport这个项目中, 我们可以在上架的时候使用消息队列的模式:
我们之前在点击一款商品上架的时候, 我们需要分成2步, 第一: 更新商品表中该商品的上架状态. 第二: 将该商品信息保存到Solr服务器中.  那么如果我们使用了消息队列后, 第二步就可以使用发送message来异步完成.

消息队列可以接收消息和 发送消息

消息队列类型:

队列:一对一聊天  私聊  QQ

主题(订阅模式):一对多聊天  群聊  QQ

名词解释: 

 二, 代码原型
ActiveMQ需要部署到Linux系统下, 这里就不再做概述.
这里也是tar包, 导入到linux下直接解压启动即可, 前面已经有过很多博文讲Linux下一些常用软件的安装步骤.

上架代码原型:
项目构件图:

未使用ActiveMQ前ProductServiceImpl.cs:

 1 //上架
 2     public void isShow(Long[] ids){
 3         Product product = new Product();
 4         product.setIsShow(true);
 5         for (final Long id : ids) {
 6             //上下架状态
 7             product.setId(id);
 8             productDao.updateByPrimaryKeySelective(product);
 9
10             //这个地方的代码应该在babasport-solr中写, 现在使用ActiveMQ进行迁移.
11             //TODO 保存商品信息到Solr服务器
12             SolrInputDocument doc = new SolrInputDocument();
13             //ID
14             doc.setField("id", id);
15             //名称
16             Product p = productDao.selectByPrimaryKey(id);
17             doc.setField("name_ik", p.getName());
18             //图片URL
19             doc.setField("url", p.getImgUrls()[0]);
20             //品牌 ID
21             doc.setField("brandId", p.getBrandId());
22             //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
23             SkuQuery skuQuery = new SkuQuery();
24             skuQuery.createCriteria().andProductIdEqualTo(id);
25             skuQuery.setOrderByClause("price asc");
26             skuQuery.setPageNo(1);
27             skuQuery.setPageSize(1);
28             List<Sku> skus = skuDao.selectByExample(skuQuery);
29             doc.setField("price", skus.get(0).getPrice());
30             //...时间等 剩下的省略
31
32             try {
33                 solrServer.add(doc);
34                 solrServer.commit();
35             } catch (Exception e) {
36                 // TODO Auto-generated catch block
37                 e.printStackTrace();
38             }
39
40
41
42
43             //TODO 静态化
44         }
45     }

上面的代码 除了更改本来就该更改的商品状态信息外, 还去见商品信息保存到了Solr服务器中了. 这里我们使用ActiveMQ进行改造: 
使用ActiveMQ后的ProductServiceImpl.cs:

 1 //上架
 2     public void isShow(Long[] ids){
 3         Product product = new Product();
 4         product.setIsShow(true);
 5         for (final Long id : ids) {
 6             //上下架状态
 7             product.setId(id);
 8             productDao.updateByPrimaryKeySelective(product);
 9
10             //发送商品ID到ActiveMQ即可.
11             jmsTemplate.send(new MessageCreator() {
12
13                 @Override
14                 public Message createMessage(Session session) throws JMSException {
15
16                     return session.createTextMessage(String.valueOf(id));
17                 }
18             });
19
20             //TODO 静态化
21         }
22     }

接着就是配置消息发送方(JMS生产者) mq.xml:

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:aop="http://www.springframework.org/schema/aop"
 5     xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 8     xsi:schemaLocation="http://www.springframework.org/schema/beans
 9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10         http://www.springframework.org/schema/mvc
11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12         http://www.springframework.org/schema/context
13         http://www.springframework.org/schema/context/spring-context-4.0.xsd
14         http://www.springframework.org/schema/aop
15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16         http://www.springframework.org/schema/tx
17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18         http://www.springframework.org/schema/task
19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
20         http://code.alibabatech.com/schema/dubbo
21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24     <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
25     <!-- 连接工厂, 此工厂由Apache提供 -->
26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27         <!-- 连接地址
28             在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
29         -->
30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31         <!-- 设置用户名及密码 -->
32         <property name="userName" value="admin"></property>
33         <property name="password" value="admin"></property>
34     </bean>
35
36     <!-- 配置连接池管理工厂 -->
37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38         <!-- 注入工厂 -->
39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40         <!-- 设置最大连接数 -->
41         <property name="maxConnections" value="5"></property>
42     </bean>
43
44     <!-- 把上面的工厂交给Spring管理  -->
45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46         <!-- 注入上面的工厂 -->
47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48     </bean>
49
50     <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52         <!-- 注入Spring单例工厂 -->
53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
54         <!-- 设置默认的目标管道 -->
55         <property name="defaultDestinationName" value="pId"/>
56     </bean>
57 </beans>

配置说明: 这里是首先构建一个MQ的连接工厂, 只要ActiveMQ启动后 就可以这样构建连接了. 配置登录的用户名和和密码.
接着就是配置连接池, 把连接工厂交给连接池去管理. 这些都是Apache厂商提供的. 
接着就是再将连接池交由Spring管理. 
最后我们再来配置一个jmsTemplate模板来操作ActiveMQ, 这个类似于jdbcTemplate模板. 而且我们这个里面注入了一个默认的管道, 也就是productId, 因为我们现在是 传递消息一一去对应, 关于怎么对应  就是依赖于这个管道.

接下来我们就看下消息的接收方(JMS消费者)的一些东西:
消费者的目录结构:(Solr)

Solr项目中的ActiveMQ配置文件mq.xml:

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:aop="http://www.springframework.org/schema/aop"
 5     xmlns:tx="http://www.springframework.org/schema/tx"
 6     xmlns:task="http://www.springframework.org/schema/task"
 7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 8     xsi:schemaLocation="http://www.springframework.org/schema/beans
 9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
10         http://www.springframework.org/schema/mvc
11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
12         http://www.springframework.org/schema/context
13         http://www.springframework.org/schema/context/spring-context-4.0.xsd
14         http://www.springframework.org/schema/aop
15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
16         http://www.springframework.org/schema/tx
17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
18         http://www.springframework.org/schema/task
19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
20         http://code.alibabatech.com/schema/dubbo
21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
22
23
24     <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
25     <!-- 连接工厂, 此工厂由Apache提供 -->
26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
27         <!-- 连接地址
28             在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
29         -->
30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
31         <!-- 设置用户名及密码 -->
32         <property name="userName" value="admin"></property>
33         <property name="password" value="admin"></property>
34     </bean>
35
36     <!-- 配置连接池管理工厂 ,由Apache提供.-->
37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
38         <!-- 注入工厂 -->
39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
40         <!-- 设置最大连接数 -->
41         <property name="maxConnections" value="5"></property>
42     </bean>
43
44     <!-- 把上面的工厂交给Spring管理  -->
45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
46         <!-- 注入上面的工厂 -->
47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
48     </bean>
49
50     <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
52         <!-- 注入Spring单例工厂 -->
53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
54         <!-- 设置默认的目标管道 -->
55         <property name="defaultDestinationName" value="pId"/>
56     </bean>
57
58     <!-- 实例化一个监听到消息后 处理此消息的类 -->
59     <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
60
61     <!-- 配置实时监听器 -->
62     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
63         <!-- 配置工厂, 需要配置spirng的工厂 -->
64         <property name="connectionFactory" ref="singleConnectionFactory"/>
65         <!-- 设置监听的目标 -->
66         <property name="destinationName" value="pId"/>
67         <!-- 监听后获取消息的类, 接收监听到的消息 -->
68         <property name="messageListener" ref="customMessageListener"></property>
69     </bean>
70 </beans>

我们来说下 和上面配置不同的地方, 我们在这里配置了一个监听器, 因为接收到 JMS 生产者发过来的消息后我们需要有个监听器去监听且 将监听到的消息拿过来处理.
接下来看看监听器的处理方法做了些什么事情: 
CustomMessageListener.java:

 1 /*
 2  * 接收MQ中的消息
 3  */
 4 public class CustomMessageListener implements MessageListener{
 5     @Autowired
 6     private SearchService searchService;
 7
 8     @Override
 9     public void onMessage(Message message) {
10         //先将接收到的消息强转为ActiveMQ类型的消息
11         //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage
12         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
13         try {
14             String id = amtm.getText();
15             System.out.println("接收到的ID:"+id);
16             searchService.insertProductToSolr(Long.parseLong(id));
17         } catch (JMSException e) {
18             // TODO Auto-generated catch block
19             e.printStackTrace();
20         }
21     }

因为我们接收到的是string类型的文本, 所以这里我们直接将接收到的消息转换为ActiveMQText类型, 然后通过getText去得到传递过来的id, 然后我们就可以通过这个productId去做相应的操作了.

接下来就看保存商品信息到Solr服务器的逻辑:
SearchServiceImpl.java:

 1 //保存商品信息到Solr服务器中, 通过ActiveMQ
 2     public void insertProductToSolr(Long productId){
 3         //TODO 保存商品信息到Solr服务器
 4         SolrInputDocument doc = new SolrInputDocument();
 5         //ID
 6         doc.setField("id", productId);
 7         //名称
 8         Product p = productDao.selectByPrimaryKey(productId);
 9         doc.setField("name_ik", p.getName());
10         //图片URL
11         doc.setField("url", p.getImgUrls()[0]);
12         //品牌 ID
13         doc.setField("brandId", p.getBrandId());
14         //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
15         SkuQuery skuQuery = new SkuQuery();
16         skuQuery.createCriteria().andProductIdEqualTo(productId);
17         skuQuery.setOrderByClause("price asc");
18         skuQuery.setPageNo(1);
19         skuQuery.setPageSize(1);
20         List<Sku> skus = skuDao.selectByExample(skuQuery);
21         doc.setField("price", skus.get(0).getPrice());
22         //...时间等 剩下的省略
23
24         try {
25             solrServer.add(doc);
26             solrServer.commit();
27         } catch (Exception e) {
28             // TODO Auto-generated catch block
29             e.printStackTrace();
30         }
31     }

这样就比较明朗了, ActiveMQ 队列就是这样来实现的.

====================接下来还会有 ActiveMQ 订阅者模式的示例, 这里只是生产者发送消息给单个消费者, 下次还会更新生产者发送消息给多个消费者.

 

 

时间: 2024-12-18 02:36:20

[项目构建 八]babasport ActiveMQ的介绍及使用实例.的相关文章

[项目构建 七]babasport 商品上架及Solr使用实例.

前面已经讲过 如果安装及配置Solr服务器了, 那么现在我们就来正式在代码中使用Solr.1,这里Solr主要是怎么使用的呢?  当我们在前台页面搜索商品名称关键词时, 我们这时是在Solr库中去查找相应的商品信息, 然后将搜索关键词高亮. 2,那么Solr库中的商品信息又是如何添加的呢?  当我们在给商品上架的时候, 将商品信息update 到mysql数据库中的bbs_product表中, 然后同样的将相应的信息 添加到Solr库中. 接下来就看代码的具体实现吧: 一, 商品上架我们在这里点

[项目构建 九]babasport 页面静态化技术Freemarker技术的介绍及使用实例.

一.FreeMarker简介 1.动态网页和静态网页差异 在进入主题之前我先介绍一下什么是动态网页,动态网页是指跟静态网页相对应的一种网页编程技术.静态网页,随着HTML代码的生成,页面的内容和显示效 果就不会再发生变化(除非你修改页面代码).而动态网页则不然,页面代码虽然没有发生变化,但是显示的内容却是可以随着时间.环境或者数据库操作的结果而 发生相应的变化.简而言之,动态网页是基本的HTML语法规范与java.VB.VC等高级程序设计语言.数据库编程等多种技术的融合,以实现对网站内容 和风格

[项目构建 十]babasport 集群下session共享问题的解决方案.

这一篇博客来讲解下babasport这个项目中使用的Login功能, 当然这里说的只是其中的一些简单的部分, 记录在此 方便以后查阅. 一: 去登录页面首先我们登录需要注意的事项是, 当用户点击登录按钮时,转入登录页面时也要记住之前用户是从哪个页面发送请求过来的, 这样登录成功后还能继续跳回到用户之前浏览的那个页面.我们页面展示显示的登录按钮都是集成在一个common的jsp中, 前台每个页面都是引用的这个jsp, 所以需要在这个common的jsp中直接添加点击登录按钮跳转的页面.这里点击登录

[项目构建 六]babasport Mybatis逆向工程构建项目实例.

mybaits需要程序员自己编写sql语句,mybatis官方提供逆向工程 可以针对单表自动生成mybatis执行所需要的代码(mapper.java,mapper.xml.pojo等)有了sql表的结构后, 我们就可以利用逆向工程直接生成相应的Dao和JavaBean代码, 这样能够大大减少我们平时开发的工作量. 但是我还是觉得使用逆向工程局限性很大, 例如我们的逆向工程main方法只能执行一次, 如果再次执行就会继续生成相应的Dao和JavaBean, 除非我们把之前生成的全都删除. 这样对

[项目构建 五]babasport ajax图片上传及FastDFS入门案例.

今天来开始写图片上传的功能, 现在的图片上传都讲求 上传完成后立刻回显且页面不刷新, 这里到底是怎么做的呢? 当然是借助于ajax了, 但是ajax又不能提交表单, 这里我们还要借助一个插件: jquery.form.js剩下的一个是FastDFS, 那么什么是FastDFS呢? FastDFS是一个开源的轻量级分布式文件系统,由跟踪服务器(tracker server).存储服务器(storage server)和客户端(client)三个部分组成,主要解决了海量数据存储问题,特别适合以中小文

[项目构建 十三]babasport Nginx负载均衡的详细配置及使用案例详解.

在这里再次说明下, 这个项目是从网上 找到的一套学习资料, 自己在 空闲时间学习了这些东西. 这里面的code当然会有很多不完善的地方, 但是确实也能学到很多新东西.感谢看过这一些列博文和评论的小伙伴, 我把自己所看到的学到的拿到这里来分享是想和大家一起学习进步, 想听听园友给出的意见, 也是对自己学习过程的一个总结. 最后我会将这套资料的所有内容共享出来, 如果有愿意学习的同学可以下载下来使用.PS: 我自认为 这些内容对于刚工作1-2年的同学来说真的很适用. 技术无止境, 我们仍需努力! 1

[项目构建 十一]babasport 购物车的原理及实现.

今天来开始写一下关于购物车的东西, 这里首先抛出四个问题: 1)用户没登陆用户名和密码,添加商品, 关闭浏览器再打开后 不登录用户名和密码 问:购物车商品还在吗? 2)用户登陆了用户名密码,添加商品,关闭浏览器再打开后 不登录用户名和密码 问:购物车商品还在吗? 3)用户登陆了用户名密码,添加商品, 关闭浏览器,然后再打开,登陆用户名和密码  问:购物车商品还在吗? 4)用户登陆了用户名密码,添加商品, 关闭浏览器 外地老家打开浏览器  登陆用户名和密码 问:购物车商品还在吗? 上面四个问题都是

[项目构建 六]babasport Redis使用实例.

在项目中使用了Redis, 因为在项目中暂时只涉及到使用Redis生成主键, 那么本文就来讲一下Redis安装的方法和Redis生成主键的优点以及和其他几种方式生成主键的对比. 1,Redis安装首先将Redis的tar包拷贝到Linux下的根目录 然后解压到redis文件夹下:(先使用mkdir创建redis文件夹) 接下来就是解压tar包到redis目录下: 解压后的目录结构: 编译: 使用Make命令 安装: 安装好之后的目录:  6379 下的目录结构:(这个rdb文件时: redis

Rust 中项目构建管理工具 Cargo简单介绍

cargo是Rust内置的项目管理工具.用于Rust 项目的创建.编译.执行,同一时候对项目的依赖进行管理,自己主动推断使用的第三方依赖库,进行下载和版本号升级. 一.查看 cargo 版本号 安装Rust之后,能够使用 cargo --version 查看cargo的版本号信息. $ cargo --version cargo 0.8.0-nightly (28a0cbb 2016-01-17) 二.创建新项目 1.使用 " new 项目名称" 创建新项目 cargo new hel