背景:
在了解一个分布式框架的时候,偶然接触到activimq消息队列,于是就决定写一个小demo
首先是linux配置activimq
去官网下载一个activimq的linux安装包,直接解压,到bin目录下执行:
activemq start
这样就启动了,很简单
然后可以查看是否有新的端口61616被监听
接着查看是否有相关进程
同时activimq还有一个后台监控页面, 但是这个默认端口是8161,访问地址为:
http://10.10.10.30:8161/admin/
默认用户名、密码均为admin
接下来是测试代码,maven库地址:
1 <dependency> 2 <groupId>org.apache.activemq</groupId> 3 <artifactId>activemq-core</artifactId> 4 <version>5.7.0</version> 5 </dependency>
1 package com.asen.activimq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.DeliveryMode; 6 import javax.jms.Destination; 7 import javax.jms.MessageProducer; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnection; 12 import org.apache.activemq.ActiveMQConnectionFactory; 13 14 public class Sender { 15 private static final int SEND_NUMBER = 5; 16 17 public static void main(String[] args) { 18 // ConnectionFactory :连接工厂,JMS 用它创建连接 19 ConnectionFactory connectionFactory; 20 // Connection :JMS 客户端到JMS 21 // Provider 的连接 22 Connection connection = null; 23 // Session: 一个发送或接收消息的线程 24 Session session; 25 // Destination :消息的目的地;消息发送给谁. 26 Destination destination; 27 // MessageProducer:消息发送者 28 MessageProducer producer; 29 // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar 30 connectionFactory = new ActiveMQConnectionFactory( 31 ActiveMQConnection.DEFAULT_USER, 32 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616"); 33 try { 34 // 从工厂获取连接对象 35 connection = connectionFactory.createConnection(); 36 // 启动 37 connection.start(); 38 // 获取操作连接 39 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 40 // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 41 // 获取消息目的地 42 destination = session.createQueue("FirstQueue"); 43 // 获取发送者 44 producer = session.createProducer(destination); 45 // 设置不持久化,此处学习,实际根据项目决定 46 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 47 // 构造消息,此处写死,项目就是参数,或者方法获取 48 sendMessage(session, producer); 49 session.commit(); 50 } catch (Exception e) { 51 e.printStackTrace(); 52 } finally { 53 try { 54 if (null != connection) 55 connection.close(); 56 } catch (Exception e) { 57 e.printStackTrace(); 58 } 59 } 60 } 61 62 public static void sendMessage(Session session, MessageProducer producer) 63 throws Exception { 64 65 for (int i = 1; i <= SEND_NUMBER; i++) { 66 TextMessage message = session.createTextMessage("ActiveMq 发送消息" + i); 67 // 发送消息到目的地方 68 System.out.println("发送消息:" + "ActiveMq 发送消息" + i); 69 producer.send(message); 70 } 71 } 72 }
1 package com.asen.activimq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.MessageConsumer; 7 import javax.jms.Session; 8 import javax.jms.TextMessage; 9 10 import org.apache.activemq.ActiveMQConnection; 11 import org.apache.activemq.ActiveMQConnectionFactory; 12 13 public class Receiver { 14 public static void main(String[] args) { 15 // ConnectionFactory :连接工厂,JMS 用它创建连接 16 ConnectionFactory connectionFactory; 17 // Connection :JMS 客户端到JMS Provider 的连接 18 Connection connection = null; 19 // Session: 一个发送或接收消息的线程 20 Session session ; 21 // Destination :消息的目的地;消息发送给谁. 22 Destination destination; 23 // 消费者,消息接收者 24 MessageConsumer consumer; 25 // 初始化工厂 26 connectionFactory = new ActiveMQConnectionFactory( 27 ActiveMQConnection.DEFAULT_USER, 28 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616"); 29 try { 30 // 从工厂获取连接对象 31 connection = connectionFactory.createConnection(); 32 // 启动 33 connection.start(); 34 // 获取操作连接 35 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 36 // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在ActiveMq的console配置 37 // 获取消息目的地:队列 38 destination = session.createQueue("FirstQueue"); 39 // 接收消息 40 consumer = session.createConsumer(destination); 41 while(true){ 42 // 设置接收者接收消息的时间,为了便于测试,这里设定为100s 43 TextMessage message = (TextMessage)consumer.receive(100000); 44 System.out.println("message:" + message); 45 if(null != message){ 46 System.out.println("接收到消息:" + message.getText()); 47 }else{ 48 break; 49 } 50 } 51 } catch (Exception e) { 52 e.printStackTrace(); 53 } finally { 54 try { 55 if (null != connection) { 56 connection.close(); 57 } 58 } catch (Exception e) { 59 e.printStackTrace(); 60 } 61 } 62 63 } 64 }
运行之后,后台监控页面可以看见:
activimq持久化常用的有三种方式:1、文件持久化 2、mysql持久化 3、oracle持久化
在activimq的配置文件中默认开启了文件持久化
同时我们需要修改一行代码:
这样在activimq重启之后就不会有消息丢失了
时间: 2024-10-11 21:32:57