package com.yunda.inter.preload.contextinit; import net.sf.json.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.yunda.inter.shipmentAcceptor.bean.ShipmentData; import com.yunda.inter.shipmentCheck.service.ShipmentCheckService; import com.yunda.inter.sign.service.SignService; import com.yunda.inter.util.CommUtil; import com.yunda.inter.util.QueueUtil; import com.yunda.inter.util.StringUtil; /** * 启动预加载信息类 *@author Administrator */ public class ContextLoaderSpringListener implements ApplicationListener<ContextRefreshedEvent>{ private static Log logger = LogFactory.getLog(ContextLoaderSpringListener.class); @Autowired private ShipmentCheckService shipmentCheckService; //当spring容器初始化完成后就会执行该方法。 public void onApplicationEvent(ContextRefreshedEvent event) { logger.debug("ConfigLoadListener init......"); try { //创建一个频道 Channel channel = QueueUtil.getConnection().createChannel(); boolean durable = true; //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QueueUtil.getQueueName(), durable, false, false, null); //创建队列消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消费队列 //TODO:并发测试MQ,ack? channel.basicConsume(QueueUtil.getQueueName(), false/*打开应答机制*/, consumer); while (true) { //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); byte[] body = delivery.getBody(); try { String str=new String(body,"UTF-8"); JSONObject j = JSONObject.fromObject(str); String shipmentId = j.getString("shipmentId"); String vehicleId = j.getString("vehicleId"); int planLineType = j.getInt("planLineType"); shipmentCheckService.check(shipmentId,vehicleId,planLineType); } catch (RuntimeException e) { logger.error("货运单数据校验出现异常:", e); logger.error("Source package:"+ CommUtil.getEncodeData(body)); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception e) { logger.error("货运单存储器出现异常:", e); } } }
private void storeInQueue(byte[] dst) throws IOException, TimeoutException { Channel channel = QueueUtil.getConnection().createChannel(); channel.queueDeclare(QueueUtil.getQueueName(), /*持久存储*/false, false, false, null); channel.basicPublish("", QueueUtil.getQueueName(), null, dst); channel.close(); }
时间: 2024-10-30 07:37:03