1,Tomcat配置
<Resource name="topic/connectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" /> <Resource name="topic/topic0" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="TestTopic" />
2,发布/订阅
发布:http://localhost:8080/Mq/Publisher
Publisher send:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:WWH-PC-64244-1488884593844-1:3:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://TestTopic, transactionId = null, expiration = 0, timestamp = 1488884667978, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = 2017?3?7?}
订阅:http://localhost:8080/Mq/Subscriber
Subscriber:ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:WWH-PC-64244-1488884593844-1:3:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WWH-PC-64244-1488884593844-1:3:1:1, destination = topic://TestTopic, transactionId = null, expiration = 0, timestamp = 1488884667978, arrival = 0, brokerInTime = 1488884667978, brokerOutTime = 1488884667982, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [email protected], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 2017?3?7?}
3,代码
【1】消息发布
package com.ma.publish; import java.io.IOException; import java.io.PrintWriter; import javax.jms.DeliveryMode; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @WebServlet("/Publisher") public class Publisher extends HttpServlet{ /** * 消息-订阅/发布 */ private static final long serialVersionUID = -4470119218802259551L; public Publisher(){ super(); } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { PrintWriter out = resp.getWriter(); try{ InitialContext context = new InitialContext(); Topic topic = (Topic) context.lookup("java:comp/env/topic/topic0"); TopicConnectionFactory tConnectionFactory = (TopicConnectionFactory) context.lookup("java:comp/env/topic/connectionFactory"); TopicConnection tConnection = tConnectionFactory.createTopicConnection(); TopicSession topicSession = tConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher tpPublisher = topicSession.createPublisher(topic); tpPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage txMessage = topicSession.createTextMessage(); txMessage.setText("2017年3月7日"); tpPublisher.publish(txMessage); out.write("Publisher send:" +txMessage); tConnection.close(); }catch(Throwable e){ e.printStackTrace(); } } @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { } }
【2】消息订阅
package com.ma.publish; import java.io.IOException; import java.io.PrintWriter; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @WebServlet("/Subscriber") public class Subscriber extends HttpServlet { /** * 消息-发布/订阅 */ private static final long serialVersionUID = 6058649540492572496L; public Subscriber(){ super(); } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { PrintWriter out = resp.getWriter(); try{ InitialContext context = new InitialContext(); Topic topic = (Topic) context.lookup("java:comp/env/topic/topic0"); TopicConnectionFactory tConnectionFactory = (TopicConnectionFactory) context.lookup("java:comp/env/topic/connectionFactory"); TopicConnection tConnection = tConnectionFactory.createTopicConnection(); TopicSession topicSession = tConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber tpSubscriber = topicSession.createSubscriber(topic); tConnection.start(); TextMessage txMessage = (TextMessage) tpSubscriber.receive(); out.write("Subscriber:" +txMessage); tConnection.close(); }catch(Throwable e){ e.printStackTrace(); } } @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { } }
时间: 2024-10-09 22:34:35