package com.jiangchong.job; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class JMS { public static void main(String[] args) throws Exception { consume(); product(); } public static void product() throws JMSException { // "tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm:localhost:10000"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while (true) { TextMessage message = session.createTextMessage(); message.setText("message_" + new Date().toLocaleString()); producer.send(message); try { Thread.sleep(1000 * 5); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void consume() throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm:localhost"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testTopic"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } public static void p2p() throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory( "vm://localhost"); Connection connection = factory.createConnection(); connection.start(); Queue queue = new ActiveMQQueue("testQueue"); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Message message = session.createTextMessage(new Date() .toLocaleString() + " Hello JMS! come from producer"); final MessageProducer producer = session.createProducer(queue); new Thread(new Runnable() { public void run() { while (true) { try { producer.send(message); } catch (JMSException e) { e.printStackTrace(); } try { Thread.sleep(1000 * 3); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }).start(); final MessageConsumer comsumer = session.createConsumer(queue); new Thread(new Runnable() { public void run() { while (true) { Message recvMessage; try { recvMessage = comsumer.receive(); System.out.println(((TextMessage) recvMessage) .getText() + " rec"); } catch (JMSException e) { e.printStackTrace(); } try { Thread.sleep(4 * 1000L); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
时间: 2024-10-06 21:02:05