import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.ailk.biapp.ci.localization.cntv.service.IUserSynchronizationService; import com.asiainfo.biframe.utils.config.Configure; import com.asiainfo.biframe.utils.spring.SystemServiceLocator; public class JmsTopicReceiver { public void topicListener() { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; String activeUrl = Configure.getInstance().getProperty("ACTIVE_URL"); String topic = Configure.getInstance().getProperty("TOPIC_NAME"); connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, activeUrl); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //test-queue跟sender的保持一致,一个创建一个来接收 destination = session.createTopic(topic); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { TextMessage txtMsg = (TextMessage)message; String msg = txtMsg.getText(); //收到topic的时候增量同步用户 IUserSynchronizationService service = (IUserSynchronizationService)SystemServiceLocator.getInstance().getService("userSynchronizationServiceImpl"); service.startIncrementalSync(msg); } catch (Exception e) { e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } } }
时间: 2024-12-09 11:58:47