ActiveMQ的持久化
消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。
消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。
1、AMQ
ActiveMQ和Messages 通信方式
上面提到JMS通信方式分为点对点通信和发布/订阅方式 1)点对点方式(point-to-point) 点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息. 具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。 消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行 2)发布/订阅 方式(publish/subscriber Messaging) 发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。 一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
分别举例说明:
1)点对点:和前一篇文章一样
Sender类:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 2000; public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS客户端到JMS Provider的连接 Connection connection = null; // Session:一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try{ // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); //启动 connection.start(); //获取操作连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); //设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //构造消息 sendMessage(session, producer); //session.commit(); connection.close(); } catch(Exception e){ e.printStackTrace(); }finally{ if(null != connection){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generatedcatch block e.printStackTrace(); } } } } public static void sendMessage(Session session, MessageProducer producer)throws Exception{ for(int i=1; i<=SEND_NUMBER; i++){ TextMessage message = session.createTextMessage("ActiveMQ发送消息"+i); System.out.println("发送消息:ActiveMQ发送的消息"+i); producer.send(message); } } }
Receiver类:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS客户端到JMS Provider的连接 Connection connection = null; // Session:一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = newActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //得到连接对象 connection =connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建Queue destination = session.createQueue("FirstQueue"); consumer =session.createConsumer(destination); while(true){ //设置接收者接收消息的时间,为了便于测试,这里定为100s TextMessagemessage = (TextMessage)consumer.receive(100000); if(null != message){ System.out.println("收到消息" +message.getText()); }else break; } }catch(Exception e){ e.printStackTrace(); }finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
运行进行测试:
时间: 2024-10-15 07:55:16