RabbitMQ生产者消费者

package com.ra.car.rabbitMQ;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

/**
 * 生产者,用来发送拍照指令
 *
 *
 */
public class RabbitMQProducer {
    protected static final Logger logger = LoggerFactory.getLogger(RabbitMQProducer.class);

    private final static String QUEUE_NAME = "44b2fe8a-4d70-4a18-b75f-91a6f170bd16"; // 上送队列

    private String message;

    public RabbitMQProducer() {

    }

    public RabbitMQProducer(String message) {
        this.message = message;
    }

    public void sendMessage(){
        String replyQueueName = null; // 返回队列名

        ConnectionFactory connFactory = null;
        Connection conn = null;
        Channel channel = null;
        try {
            connFactory = new ConnectionFactory();
            connFactory.setHost("58.211.54.147");
            connFactory.setUsername("customer");
            connFactory.setPassword("123456");
            connFactory.setPort(5672);
            conn = connFactory.newConnection();
            channel = conn.createChannel();
            QueueingConsumer consumer = new QueueingConsumer(channel);

            Map<String, Object> param = new HashMap<String, Object>();
            param.put("x-message-ttl", 600000);
            param.put("x-expires", 86400000);

            // 返回队列
            replyQueueName = channel.queueDeclare().getQueue();
            channel.basicConsume(replyQueueName, true, consumer);
            String corrId = UUID.randomUUID().toString(); // 用来表示返回队列结果的id,唯一
            BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

            channel.queueDeclare(QUEUE_NAME, true, false, false, param);
            channel.basicPublish("", QUEUE_NAME, props, message.getBytes());
            logger.info("producer has published: \"" + message + "\"");
        } catch (IOException ioe) {
            ioe.printStackTrace();
        } catch (TimeoutException toe) {
            toe.printStackTrace();
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        } catch (ConsumerCancelledException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null)
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            if (conn != null)
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
    }

    public static void main(String[] args) {
        JSONObject json = new JSONObject();
        json.put("msgId", "8801");
        json.put("gpsNo", "001709270202");
        json.put("channelId", "1");// 1,2 前置 后置
        json.put("serialNo", "1133");
        //RabbitMQProducer rb = new RabbitMQProducer(json.toString());
        //Thread t = new Thread(rb);
        //t.start();
        //String str = "7e0805000901170427881200090003000001b6b5833313";
        //System.out.println(str.substring(26, 36)+"**"+str.substring(36,str.length()-2));
    }
}
package com.ra.car.rabbitMQ;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.ra.truck.model.MessagePackage;
import com.ra.truck.service.MessagePackegerService;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.ContextLoader;

import com.alibaba.fastjson.JSON;
import com.ra.car.utils.StringToT;
import com.ra.common.util.UuidUtil;
import com.ra.truck.service.DataCallBackService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
 * 消费者
 *
 *
 */
public class RabbitMQCustomer implements Runnable {
    protected static final Logger logger = LoggerFactory
            .getLogger(RabbitMQCustomer.class);
    private final static String QUEUE_NAME = "adb65b08-a27d-42b0-b4ac-ff10422ac213";

    private ConnectionFactory connFactory;
    private Connection conn;
    private Channel channel;
    private Delivery delivery;
    private QueueingConsumer consumer;
    private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private MessagePackegerService messagepackegerService = (MessagePackegerService)ContextLoader
            .getCurrentWebApplicationContext().getBean("MessagePackegerService");
    private DataCallBackService dataCallBackService = (DataCallBackService) ContextLoader
            .getCurrentWebApplicationContext().getBean("DataCallBackService");

    /**
     * 分开try...catch...,1.出现连接异常时,中断连接;2.出现消费数据异常时,继续去消费,不中断连接
     */
    @Override
    public void run() {
        try {
            connFactory = new ConnectionFactory();
            connFactory.setHost("58.211.54.147");
            connFactory.setUsername("customer");
            connFactory.setPassword("123456");
            connFactory.setPort(5672);
            conn = connFactory.newConnection();
            channel = conn.createChannel();

            Map<String, Object> param = new HashMap<String, Object>();
            param.put("x-message-ttl", 600000);
            param.put("x-expires", 86400000);

            channel.queueDeclare(QUEUE_NAME, true, false, false, param);

            logger.info("listening for event message...");

            consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);

            while (true) {
                try {
                    Thread.sleep(100);
                    delivery = consumer.nextDelivery();
                    // BasicProperties props = delivery.getProperties();
                    /*
                     * BasicProperties reply_props = new
                     * BasicProperties.Builder()
                     * .correlationId(props.getCorrelationId()).build();
                     */
                    // String msg = new String(delivery.getBody());
                    logger.info("*****当前时间:" + df.format(new Date()) + "*****");
                    logger.info("*****原始数据为:" + JSON.toJSONString(delivery.getBody()) + "*****");
                    String msg = MQUtils.bytes2Hex(delivery.getBody());
                    logger.info("receive msg:" + msg);

                    //RdDeviceCallBackDataDomain backDataDomain = new RdDeviceCallBackDataDomain();
                    //backDataDomain.setId(String.valueOf(System.currentTimeMillis()));
                    if (StringUtils.isNotBlank(msg)) {
                        MessagePackage msgPackeger = new MessagePackage();
                        if (msg.length() > 28) {
                            msg = msg.substring(0, msg.length()-2).replaceAll("7d02", "7e").replaceAll("7d01", "7d");//去掉标识位,转义还原
                            msgPackeger.setId(UuidUtil.create());
                            String messageId = msg.substring(2, 6);
                            msgPackeger.setMessageId(messageId);//消息Id
                            msgPackeger.setMessageProperty(msg.substring(6, 10));//消息体属性
                            msgPackeger.setImei(msg.substring(10, 22));//imi 终端手机号
                            msgPackeger.setSerialNumber(msg.substring(22, 26));//流水号
                            msgPackeger.setCheckCode(msg.substring(msg.length()-2, msg.length()));//校验码
                            if (StringUtils.isNotBlank(messageId) && messageId.equals("0801")) {
                                String msgBodyProperties = msg.substring(6, 10);// 消息体属性
                                String msgBodyProperties2 = StringToT.hexString2binaryString(msgBodyProperties);//消息属性二进制格式
                                String isSubpackage = msgBodyProperties2.substring(2, 3);   //是否分
                                if ("1".equals(isSubpackage)) {

                                    String isSplitNumber = msg.substring(26, 30); // 分报数
                                    String isSplit = msg.substring(30, 34);//消息流水号
                                    msgPackeger.setMessageSplit(isSplitNumber + isSplit);
                                    if("0001".equals(isSplit)) {
                                        String mediaId = msg.substring(34, 42);

                                        msgPackeger.setMeidiaId(mediaId);
                                        msgPackeger.setMessageBody(msg.substring(42, msg.length() - 2));
                                    }else{
                                        msgPackeger.setMessageSplit("0001");
                                        List<MessagePackage> messagepackegerList=messagepackegerService.selectMessagepackeger(msgPackeger);
                                        String meidiaId=messagepackegerList.get(0).getMeidiaId();
                                        String mediabody = messagepackegerList.get(0).getMessageBody().substring(0,64);
                                        msgPackeger.setMessageSplit(isSplitNumber + isSplit);
                                        msgPackeger.setMeidiaId(meidiaId);
                                        msgPackeger.setMessageBody(mediabody+msg.substring(34, msg.length()- 2));
                                    }
                                } else {
                                    String mediaId = msg.substring(26, 34);
                                    msgPackeger.setMeidiaId(mediaId);
                                    msgPackeger.setMessageBody(msg.substring(34, msg.length() - 2));
                                }
                            }else if(StringUtils.isNotBlank(messageId) && messageId.equals("0805")){
                                 msgPackeger.setMessageBody(msg.substring(26, 36));
                                 msgPackeger.setMeidiaId(msg.substring(36, msg.length()-2));
                            }else{
                               msgPackeger.setMessageBody(msg.substring(26, msg.length() - 2));//消息体
                            }
                              //保存校验码
                              msgPackeger.setCheckCode((msg.substring(msg.length()- 2,msg.length())));
                                dataCallBackService.insertDeviceRawDataOfOne(msgPackeger);
                            }
                        }
                    // 不用设置返回
                    /*
                     * String retMsg = "ok, give you reply:" + new
                     * String(msg.getBytes(), "utf-8");
                     * logger.info("Consumer中的返回队列名" + props.getReplyTo());
                     * channel.basicPublish("", QUEUE_NAME, reply_props,
                     * retMsg.getBytes());
                     */
                } catch (Exception e) {
                    logger.error("循环消费数据异常.....", e);
                }
            }
        } catch (Exception e) {
            logger.error("MQ connection error.....", e);
        } finally {
            try {
                if (channel != null) {
                    logger.info("channel.close");
                    channel.close();
                }
                if (conn != null) {
                    logger.info("conn.close");
                    conn.close();
                }
            } catch (IOException e) {
                logger.info("IOException");
            } catch (TimeoutException e) {
                logger.info("TimeoutException");
            }
        }

    }
}
时间: 2024-11-09 01:50:57

RabbitMQ生产者消费者的相关文章

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。)

rabbitmq作为消息队列可以有消息消费确认机制,redis的list结构可以简单充当消息队列,但不具备消费确认机制,随意关停程序,会丢失一部分正在程序中处理但还没执行完的消息. 使用rabbitmq的最常用库pika # coding=utf-8 """ 一个通用的rabbitmq生产者和消费者.使用多个线程消费同一个消息队列. """ import abc import functools import time from threadin

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

4.利用python生成器实现简单的“生产者消费者”模型

假如说,没有生成器这种对象,那么如何实现这种简单的"生产者消费者"模型呢? import time def producer(): pro_list = [] for i in range(10000): print "包子%s制作ing" %(i) time.sleep(0.5) pro_list.append("包子%s" %i) return pro_list def consumer(pro_list): for index,stuffe

生产者消费者模型实现多线程异步交互

[Python之旅]第六篇(五):生产者消费者模型实现多线程异步交互 消息队列 生产者消费者模型 多线程异步交互 摘要:  虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应该还包括Python的消息队列,因为这里多线程异步交互是通过Python的消息队列来实现的,因此主要内容如下: 1 2 3 4 1.生产者消费者模型:厨师做包子与顾客吃包子 2.Python的消息队列 3.利用... 虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应

并发编程基础之生产者消费者模式

一:概念 生产者消费者模式是java并发编程中很经典的并发情况,首先有一个大的容器,生产者put元素到 容器中,消费者take元素出来,如果元素的数量超过容器的容量时,生产者不能再往容器中put元素 ,处于阻塞状态,如果元素的数量等于0,则消费者不能在从容器中take数据,处于阻塞状态. 二:示例 /** * */ package com.hlcui.main; import java.util.LinkedList; import java.util.concurrent.ExecutorSe

经典进程同步问题一:生产者-消费者问题(The producer-consumer problem)

(注:参考教材:计算机操作系统第四版 西安电子科技大学出版社) 问题描述:一群生产者进程在生产产品,并将这些产品提供给消费者去消费.为了使生产者进程与消费者进程能够并发进行,在两者之间设置一个具有n个缓冲区的缓冲池,生产者进程将产品放入一个缓冲区中:消费者可以从一个缓冲区取走产品去消费.尽管所有的生产者进程和消费者进程是以异方式运行,但它们必须保持同步:当一个缓冲区为空时不允许消费者去取走产品,当一个缓冲区满时也不允许生产者去存入产品. 解决方案:我们这里利用一个一个数组buffer来表示这个n

生产者消费者模式

什么是生产者消费者模式   在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式.结构图如下: 生产者消费者模式的优点 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,那

生产者消费者问题

以生产者/消费者模型为依据,在linux环境下创建一个控制台进程,在该进程中创建n个线程模拟生产者和消费者,实现进程(线程)的同步与互斥. 模拟实现的情景 *M生产者,N消费者, K缓冲区 *解决生产者消费者的同步问题,访问缓冲区的互斥问题 *生产者放产品位置递增;消费者要寻找有产品的位置,不采用位置自增,解决速度不一致的问题. *缓冲区在某一时刻只有一个线程访问 #include <stdio.h> #include <stdlib.h> #include <unistd.

‘生产者-消费者’模型与‘读-写者’模型

★生产者-消费者模型 首先,我们先分析一下生产者与消费者模型:生产者与消费者是模型中不可缺少的2种角色,当然模型中肯定需要一个保存数据的场所,能够将生产者生产的数据进行存储.同时,模型必须要满足生产者产生出数据后,消费者才能够进行使用,即就是消费者必须位于生产者之后,当然生产者生产的数据最多将场所放置满就不能继续生产,下面有简单的图示: 当然,如果有多个消费者和多个生产者,生产者与消费者之间的关系是同步的,生产者与生产者之间是互斥的,因为一块空间不能让多个生产者同时进行生产.消费者和消费者之间也