ActiveMQ的数据发送类
import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; public class ActiveMQSend { private final String ip; private final String port; private PooledConnection pooledConnection; /** * 构造方法(传入需要连接的IP和端口) * * @param ip (AvctiveMQ的服务IP) * @param port (ActiveMQ的服务端口) */ public ActiveMQSend(String ip, String port) { this.ip = ip; this.port = port; this.init(); } /** * 初始化ActiveMQ的连接池 */ private void init() { try { String[] ips = this.ip.split(","); String[] ports = this.port.split(","); StringBuilder tcpLink = new StringBuilder(); for (int i = 0; i < ips.length; i++) { tcpLink.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(","); } String mqLink = tcpLink.toString(); if (tcpLink.length() > 0) { if (‘,‘ == tcpLink.charAt(tcpLink.length() - 1)) { mqLink = tcpLink.substring(0, tcpLink.length() - 1); } } String url = String.format("failover:(%s)?initialReconnectDelay=1000&timeout=3000&startupMaxReconnectAttempts=2", mqLink); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); factory.setMaxThreadPoolSize(50); PooledConnectionFactory poolFactory = new PooledConnectionFactory(factory); pooledConnection = (PooledConnection) poolFactory.createConnection(); pooledConnection.start(); } catch (Exception ex) { LogUtil.error(ex); this.destroy(); } } /** * 向ActiveMQ中发送数据 * * @param needSendMsg 需要发送的数据信息 * @param sendMQName 需要发送到的队列名称 */ public void send(String needSendMsg, String sendMQName) { if (this.pooledConnection == null) { this.init(); } if (this.pooledConnection != null) { try { Session session = this.pooledConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(sendMQName); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage(needSendMsg); producer.send(message); session.close(); } catch (Exception ex) { LogUtil.error(ex); this.destroy(); } } } /** * 回收连接池 */ public void destroy() { try { if (pooledConnection != null) { pooledConnection.stop(); } } catch (Exception e) { LogUtil.error(e); } try { if (pooledConnection != null) { pooledConnection.close(); } } catch (Exception e) { LogUtil.error(e); } pooledConnection = null; } }
调用发送
public static void main(String[] args) { Thread t1 = new Thread(new Runnable() { @Override public void run() { ActiveMQSend sen = new ActiveMQSend("127.0.0.1", "61616"); for (int i = 0; i < 50000; i++) { String msg = String.format("这是TestQueue 2 第 %s 条发送的数据!", i); sen.send(msg, "TestQueue2"); } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { ActiveMQSend sen = new ActiveMQSend("127.0.0.1", "61616"); for (int i = 0; i < 50000; i++) { String msg = String.format("这是TestQueue 1 第 %s 条发送的数据!", i); sen.send(msg, "TestQueue1"); } } }); t1.start(); t2.start(); }
ActiveMQ数据接收类
import javax.jms.MessageConsumer; import javax.jms.Destination; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; public class Receiver { private PooledConnection pooledConnection; private void init() { try { String url = "failover:(tcp://192.168.10.219:61616)?initialReconnectDelay=1000&timeout=3000&startupMaxReconnectAttempts=2"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); factory.setMaxThreadPoolSize(100); PooledConnectionFactory poolFactory = new PooledConnectionFactory(factory); pooledConnection = (PooledConnection) poolFactory.createConnection(); pooledConnection.start(); } catch (Exception ex) { Log.error(ex); this.destroy(); } } public void receive(String queueName) { if (this.pooledConnection == null) { this.init(); } if (this.pooledConnection != null) { try { Session session = this.pooledConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(10); if (null != message) { System.out.println("收到消息" + message.getText()); } } } catch (Exception ex) { Log.info(ex.getMessage()); this.destroy(); } } } public void destroy() { try { if (pooledConnection != null) { pooledConnection.stop(); } } catch (Exception e) { Log.error(e); } try { if (pooledConnection != null) { pooledConnection.close(); } } catch (Exception e) { Log.error(e); } pooledConnection = null; } }
数据接收类的调用
public static void main(String[] args) { Thread t1 = new Thread(new Runnable() { @Override public void run() { Receiver r1 = new Receiver(); r1.receive("TestQueue2"); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { Receiver r2 = new Receiver(); r2.receive("TestQueue1"); } }); t1.start(); t2.start(); }
时间: 2024-10-07 18:46:43