生产者
public class ProducerTest { public static void main(String[] args) throws Exception { // Create a ConnectionFactory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // Create a Connection Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // Create a Session Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Create the destination (Topic or Queue) Queue queue = session.createQueue("yyc-test"); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(session.createObjectMessage("你好测试")); // Clean up producer.close(); session.close(); connection.close(); } }
消费者
public class ConsumerTest { public static void main(String[] args) throws Exception { // Create a ConnectionFactory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // Create a Connection Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); connection.setExceptionListener(new MyExceptionListener()); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Queue queue = session.createQueue("yyc-test"); // Create a MessageConsumer from the Session to the Topic or Queue MessageConsumer consumer = session.createConsumer(queue); // 方式一:同步操作 Message mesg = consumer.receive(1000); if (mesg instanceof TextMessage) { TextMessage textMessage = (TextMessage) mesg; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + mesg); } consumer.close(); session.close(); connection.close(); // 方式二:异步监听操作 //consumer.setMessageListener(new MyMessageListener()); } public static class MyMessageListener implements MessageListener { @Override public void onMessage(Message msg) { System.out.println("Received: " + msg); } } public static class MyExceptionListener implements ExceptionListener { @Override public void onException(JMSException arg0) { System.out.println("JMS Exception occured. Shutting down client."); } } }
时间: 2024-11-07 12:26:47