2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例

关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置

公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务。那么到后期的话,登录服务器之后,全是

一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端

该案例的接收端,源自网上的代码片段 片内容,做了部分修改之后使用

日志中心的 功能要求使用注入解耦,所以,这里我也解耦了,如果日至那边使用的是 autofac,我只里使用的MEF实现注入 所以定义了相关的接口对象

IMQContextFactory:

using Ecostar.MQLogger.Core.Infrastructure;
using System;

namespace Ecostar.MQConsumer.Core.Infrastructure
{
    /// <summary>
    ///     仅仅只有sender使用到
    /// </summary>
    public interface IMQContextFactory
    {
        MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog);
    }
}

对应的实现类:MQContextFactory

using Ecostar.MQLogger.Core.Infrastructure;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Security.Cryptography;

namespace Ecostar.MQConsumer.Core.Infrastructure
{
    /// <summary>
    ///     仅仅只有sender使用到
    /// </summary>
    [Export(typeof(IMQContextFactory))]
    public class MQContextFactory : IMQContextFactory
    {
        /// <summary>
        /// 上下文字典
        /// </summary>
        private static readonly Dictionary<string, MQContext> Contexts = new Dictionary<string, MQContext>();

        /// <summary>
        /// 上下文操作锁字典,只创建一次
        /// </summary>
        public static readonly Dictionary<string, object> contextLockers = new Dictionary<string, object>();

        /// <summary>
        /// 更新上下文操作锁字典时的锁,只创建一次
        /// </summary>
        private static readonly object contextLockersLocker = new object();

        /// <summary>
        /// 获取指定的上下文
        /// </summary>
        /// <param name="mqUri">mq地址</param>
        /// <param name="toLog">日志记录</param>
        /// <returns>上下文对象</returns>
        public MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog)
        {
            var key = MD5Encrypt(mqUri);
            var locker = GetFactoryLocker(key);

            lock (locker)
            {
                MQContext context;
                if (!Contexts.TryGetValue(key, out context))
                {
                    Guid contextId = Guid.NewGuid();
                    string logHeader = string.Format("[{0}]", contextId.ToString());

                    context = new MQContext()
                    {
                        ReceiveQueueName = "Logs",
                        Id = contextId
                    };
                    Console.WriteLine(logHeader + "   初始化发送上下文完毕");

                    // 获取连接
                    context.SendConnection = CreateConnection(mqUri);
                    context.SendConnection.AutoClose = false;
                    context.SendConnection.ConnectionShutdown += (o, e) => Console.WriteLine("   RabbitMQ错误,连接被关闭了:" + e.ReplyText);
                    Console.WriteLine(logHeader + "   创建连接完毕", LogLevel.Trace);

                    // 获取通道
                    context.SendChannel = CreateChannel(context.SendConnection);
                    Console.WriteLine(logHeader + "   创建通道完毕", LogLevel.Trace);

                    Contexts.Add(key, context);

                }

                return context;
            }
        }

        #region 私有方法
        /// 创建连接
        /// </summary>
        /// <param name="mqUrl"></param>
        /// <returns></returns>
        private static IConnection CreateConnection(string mqUrl)
        {
            const ushort heartbeta = 120;

            var factory = new ConnectionFactory()
            {
                Uri = mqUrl,
                RequestedHeartbeat = heartbeta,
                AutomaticRecoveryEnabled = true
            };

            return factory.CreateConnection();
        }

        /// <summary>
        /// 创建通道
        /// </summary>
        /// <param name="connection"></param>
        /// <returns></returns>
        private static IModel CreateChannel(IConnection connection)
        {
            if (connection != null)
                return connection.CreateModel();
            return null;
        }

        /// <summary>
        /// 获取上下文操作锁
        /// </summary>
        /// <param name="contextKey">上下文工厂key</param>
        /// <returns></returns>
        private static object GetFactoryLocker(string contextKey)
        {
            lock (contextLockersLocker)
            {
                object locker;
                if (!contextLockers.TryGetValue(contextKey, out locker))
                {
                    locker = new object();
                    contextLockers.Add(contextKey, locker);
                }

                return locker;
            }
        }

        /// <summary>
        /// 获取字符的MD5值
        /// </summary>
        /// <param name="str"></param>
        /// <returns></returns>
        private static string MD5Encrypt(string str)
        {
            MD5 md5 = new MD5CryptoServiceProvider();
            byte[] result = md5.ComputeHash(System.Text.Encoding.Default.GetBytes(str));
            return System.Text.Encoding.Default.GetString(result);
        }
        #endregion

    }
}

注视我写的很明白,这部分的使用 是  生产者使用的类,也就是  发送消息



下面是消费者:

IReceiver.cs:

namespace Ecostar.MQConsumer.Core
{
    public interface IReceiver
    {
        /// <summary>
        ///     初始化接收程序
        /// </summary>
        /// <param name="mqUrls"></param>
        void InitialReceive(MQReceiverParam receiverParams);

    }
}

对应的实现类:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Threading;

namespace Ecostar.MQConsumer.Core
{
    [Export(typeof(IReceiver))]
    public class Receiver : IReceiver
    {
        private MQContext _context;
        private const ushort Heartbeta = 60;
        private string _queueName;
        private bool _isAutoAck;
        private List<string> _mqUrls;
        private Func<byte[], bool> _processFunction;
        private Action<string> _mqActionLogFunc;
        private MQConnectionFactory _ConnectionFactoryParams;

        public void InitialReceive(MQReceiverParam receiverParams)
        {
            _queueName       = receiverParams._queueName;
            _isAutoAck       = receiverParams._isAutoAck;
            _mqUrls          = receiverParams._mqUrls;
            _processFunction = receiverParams._processFunction;
            _mqActionLogFunc = receiverParams._mqActionLogFunc;
            _ConnectionFactoryParams = receiverParams.ConnectionFactoryParam;
            receiverParams._mqUrls.ForEach(url => InitReceive(_queueName, _isAutoAck, url));

        }

        /// <summary>
        /// 初始化某个节点的接收
        /// </summary>
        private void InitReceive(string queueName, bool isAutoAck, string mqUrl)
        {
            Guid contextId = Guid.NewGuid();
            string logHeader = string.Format("[{0}, {1}]", queueName, contextId.ToString());
            try
            {
                _context = new MQContext()
                {
                    Id = contextId,
                    ReceiveQueueName = queueName,
                    IsAutoAck = isAutoAck,
                    ReceiveConnection = new ConnectionFactory()
                    {
                        HostName = _ConnectionFactoryParams.HostName,
                        UserName = _ConnectionFactoryParams.UserName,
                        Password = _ConnectionFactoryParams.Password,
                        VirtualHost = _ConnectionFactoryParams.VirtualHost
                    }.CreateConnection()
                };

                // 监听Shutdown事件,记录下LOG便于排查和监管服务的稳定性
                _context.ReceiveConnection.ConnectionShutdown += (o, e) =>
                {
                    _mqActionLogFunc("   RabbitMQ错误,连接被关闭了:" + e.ReplyText);
                };
                // 获取通道
                _context.ReceiveChannel = _context.ReceiveConnection?.CreateModel();

                // 创建事件驱动的消费者
                var consumer = new EventingBasicConsumer(_context.ReceiveChannel);
                consumer.Received += (o, e) =>
                {
                    try
                    {
                        // 接受数据处理逻辑
                        // e.Body
                        var result = _processFunction(e.Body);

                        if (!isAutoAck)
                        {
                            if (!result)
                            {
                                Thread.Sleep(300);

                                // 未能处理完成的话,将消息重新放入队列头
                                _context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
                                _mqActionLogFunc("   消息未处理成功,将消息重新放入队列头");
                            }
                            else if (!_context.ReceiveChannel.IsClosed)
                            {
                                // 处理成功并且通道未关闭时ack回去,删除队列中的消息
                                _context.ReceiveChannel.BasicAck(e.DeliveryTag, false);
                                _mqActionLogFunc("   消息处理成功,发送Ack完毕");
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        Thread.Sleep(300);
                        if (!isAutoAck)
                        {
                            // 将消息重新放入队列头
                            _context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
                        }
                        _mqActionLogFunc("   处理数据发生异常:" + ex.Message + ex.StackTrace);
                    }
                };

                // 一次只获取一条消息
                _context.ReceiveChannel.BasicQos(0, 1, false);
                _context.ReceiveChannel.BasicConsume(_context.ReceiveQueueName, _context.IsAutoAck, consumer);

                _mqActionLogFunc("   初始化队列完毕");
            }
            catch (Exception ex)
            {
                _mqActionLogFunc("   初始化RabbitMQ出错:" + ex.Message + ex.StackTrace);
            }
        }

    }
}

使用到的参数 MQReceiverParam:

using System;
using System.Collections.Generic;

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///     消费者入参
    /// </summary>
    public class MQReceiverParam
    {
        public  string _queueName { get; set; }
        public  bool _isAutoAck { get; set; }
        public  List<string> _mqUrls { get; set; }
        public  Func<byte[], bool> _processFunction { get; set; }
        public  Action<string> _mqActionLogFunc { get; set; }
        public MQConnectionFactory ConnectionFactoryParam { get; set; }

    }

    /// <summary>
    ///     服务配置
    /// </summary>
    public class MQConnectionFactory
    {
        public string HostName     {get;set;}
        public string UserName     {get;set;}
        public string Password     {get;set;}
        public string VirtualHost  {get;set;}
    }

}

重力要说明一下:

Func<byte[], bool> _processFunction { get; set; }
Action<string> _mqActionLogFunc { get; set; }

参数的对象中,有这么两个委托,原因是,如果你在学习 rabbitmq得这块内容的时候,你会发现,网上很多案例,以及官方提供的案例,写法都比较简单,而且,都是讲业务逻辑和   rabbitmq的消费的这跨功能 耦合到了一起

如果其他地方使用的时候,还是重复,创建  connection   创建queue,绑定,,,,,等相关动作,代码不仅不美观,而且显得繁琐,啰嗦,所以,这两个委托类型的参数,起到了接偶的作用,似的 具体的业务逻辑和 rabbitmq的消费逻辑 分离

使用如下:

(我是在窗体上直接放置了一个  richTextBox的控件,讲接收的信息打印出来,)

using Ecostar.MQConsumer.Core;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.ComponentModel.Composition.Hosting;
using System.IO;
using System.Reflection;
using System.Text;
using System.Windows.Forms;

namespace Ecostar.MQConsumer.UI
{
    [Export]
    public partial class MQMainForm : Form
    {

        #region Fields
        private static CompositionContainer _container;//MEF 部件组合 管理
        [Import]
        public IReceiver Receiver { get; set; }

        #endregion

        public MQMainForm()
        {
            InitializeComponent();
        }

        private void MQMainForm_Load(object sender, EventArgs e)
        {
            InitForm();
            InitialListener();
        }

        public void InitForm()
        {
            AggregateCatalog catalog = new AggregateCatalog();
            catalog.Catalogs.Add(new DirectoryCatalog(Directory.GetCurrentDirectory()));
            catalog.Catalogs.Add(new AssemblyCatalog(Assembly.GetExecutingAssembly()));
            _container = new CompositionContainer(catalog);
        }

        /// <summary>
        ///     初始化监听程序
        /// </summary>
        void InitialListener()
        {
            MQMainForm form;
            try
            {
                form = _container.GetExportedValue<MQMainForm>();
            }
            catch (Exception ex)
            {

                throw;
            }
            form.Receiver.InitialReceive(new MQReceiverParam()
            {
                _queueName = "testQueueName",
                _isAutoAck = false,
                _mqUrls = new List<string>() { "amqp://127.0.0.1:5672/" },
                _processFunction = (buffer) =>
                {
                    string receiveMsg = Encoding.UTF8.GetString(buffer);
                    this.rtb_receive.Invoke(new Action(() => { { this.rtb_receive.Text += receiveMsg + "\r\n"; } }));
                    return true;

                },
                _mqActionLogFunc = (msg) =>
                {
                    this.rtb_receive.Invoke(new Action(() =>
                    {
                        this.rtb_receive.Text += "====MQ Action====" + msg + "\r\n";
                    }));
                },
                ConnectionFactoryParam = new MQConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "CC",
                    Password = "123qwe",
                    VirtualHost = "/"
                }
            });
        }

    }
}

其中的 testQueueName,是客户端发送的 消息列队名称,也就是queue的名称,你也可以(如果是测试),在mq服务器上 人为的添加这个queue名称之后再测试。

这样一来,_processFunction 这个用于消费的方法,可以,写任意的处理方式,比如打印到控制台,输出到床体 控件显示,写入到日志,写入到数据库等等。

而且中的 _mqActionLogFunc,适用于记录mq的消费过程的日志,比如 mq消费操作执行过程中发生异常 ,那么直接找mq的问题即可。



截图中还一个:MQContext类,这是一个部分类,为了方便区分,我把消费者,生产者  公共部分分别放置到了三个部分类中:

MQContext.Consumer.cs

using RabbitMQ.Client;

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///        MQ 消费者
    /// </summary>
    public partial class MQContext
    {
        // <summary>
        /// 用户监听的Connection
        /// </summary>
        public IConnection ReceiveConnection { get; set; }

        /// <summary>
        /// 用于监听的Channel
        /// </summary>
        public IModel ReceiveChannel { get; set; }

        /// <summary>
        /// 监听队列名
        /// </summary>
        public string ReceiveQueueName { get; set; }
    }
}

MQContext.cs

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///     MQ 生产者消费者公共部分
    /// </summary>
    public partial class MQContext
    {
        /// <summary>
        /// mq地址
        /// </summary>
        public string MQUrl { get; set; }

    }
}

MQContext.Producer.cs

using RabbitMQ.Client;
using System;

namespace Ecostar.MQConsumer.Core
{
    /// <summary>
    ///        MQ 生产者
    /// </summary>
    public partial class MQContext
    {
        /// <summary>
        /// 用于发送消息的Connection
        /// </summary>
        public IConnection SendConnection { get; set; }

        /// <summary>
        /// 用于发送消息到Channel
        /// </summary>
        public IModel SendChannel { get; set; }

        /// <summary>
        /// 发送的Exchange
        /// </summary>
        public string Exchange { get; set; }

        /// <summary>
        /// 是否启用自动删除
        /// </summary>
        public bool IsAutoAck { get; set; }
        /// <summary>
        /// 上下文ID
        /// </summary>
        public Guid Id { get; set; }

        /// <summary>
        /// 路由
        /// </summary>
        public string RouteKey { get; set; }
        /// <summary>
        /// 是否正在运行,默认false
        /// </summary>
        public bool IsRunning { get; set; }

        /// <summary>
        /// 回收此上下文
        /// </summary>
        public void Recovery()
        {
            IsRunning = false;
        }
    }
}



到此,这个简单的消费案例就完成了。



下面的是 生产者,(发送消息的案例),为了造数据,所以写的随意些:(一个控制台程序),nuget引入  rabbitmq.client ,指令: install-package rabbitmq.Client

using RabbitMQ.Client;
using System;
using System.Text;

namespace RubbitMQClient
{
    /// <summary>
    ///     1.Routing (按路线发送接收)
    /// </summary>
    public class RoutingType
    {
        public static void RoutingProducer(string[] arguments)
        {
            arguments = new string[] { "0","" };

            string serverAddress = "127.0.0.1";
            string account = "CC";
            string password = "123qwe";

            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = serverAddress,
                UserName = account,
                Password = password,
                VirtualHost = "/"
            };
            IConnection conn = factory.CreateConnection();
            for (int i = 0; i < 1000; i++)
            {
                arguments[1] = i.ToString();
                string queueName = "testQueueName";

                using (var channel = conn.CreateModel())
                {
                    //---1.声明durable Exchange 和 Queue--------------------------------------------------------------------------------------------------------------
                    channel.ExchangeDeclare(Consts.EXCHANGE_NAME_DIRECT, "direct", durable: true, autoDelete: false, arguments: null);
                    //arguments = new[] { "12321", "32432" };
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueBind(queueName, Consts.EXCHANGE_NAME_DIRECT, routingKey: "");//queueName

                    //----------------------------------------------------------------------------------------------------------------------

                    //---2.发布持久化消息到队列 ---------------------------------------------------------------------------------------------------
                    var props = channel.CreateBasicProperties();
                    //props.Priority = 3;//控制优先级
                    props.DeliveryMode = 2;//将信息也持久化
                    props.Persistent = true;///SetPersistent方式提示已经过时,建议使用当前方式
                    string severity = getSeverity(arguments);
                    string message = getMessage(arguments);
                    byte[] buffer = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(Consts.EXCHANGE_NAME_DIRECT, routingKey: "", basicProperties: props, body: buffer);

                    ////---消费消息
                    //BasicGetResult msgResponse = channel.BasicGet(queueName, noAck: true);

                    //var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                    //Console.WriteLine(msgBody);

                    //3.1(发布方式还有一种 基于推送的事件订阅 )第二种方式(使用内置的 QueueingBasicConsumer 提供简化的编程模型,通过允许您在共享队列上阻塞,直到收到一条消息)
                    //var consumer = new QueueingBasicConsumer(channel);
                    //channel.BasicConsume(queueName, noAck: true, consumer: consumer);
                    //var msgResponse = consumer.Queue.Dequeue(); //blocking
                    //var msgBody = Encoding.UTF8.GetString(msgResponse.Body);

                }

            }
            conn.Close();
            Console.ReadKey();
        }

        private static String getSeverity(String[] strings)
        {
            if (strings.Length < 1)
                return "routing(direct) type info";
            return strings[0];
        }

        private static String getMessage(String[] strings)
        {
            if (strings.Length < 2)
                return "routing(direct) --> Hello World!";
            return joinStrings(strings, " ", 1);
        }

        private static String joinStrings(String[] strings, String delimiter, int startIndex)
        {
            return strings[1].ToString();
        }

    }
}

抽时间将上面涉及到的   mq一些相关  属性(常用的API的),在总结下,主要是零散,其实东西很简单,如何更好的,更灵活的组合到一起,是这个插件使用的 最主要一点。

时间: 2024-10-26 23:44:03

2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例的相关文章

1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置

一. 背景: 公司项目有所改动,要求微信(移动端调用的接口),日志接口换位log4net,全部改成以rabbitMQ作为服务支持, 二.本地环境: windows 10 enterprise ,vs2015 三 1).获取RabbitMQ http://www.rabbitmq.com/install-windows.html 选择相应机器版本(32bit ro 64bit),这里,我下载的是64位的, 双击安装之后,这里要说明一点,因为安装完成之后,直接运行会出现以下错误: 提示很明显,类似J

rabbitMQ应用,laravel生产广播消息,springboot消费消息

最近做一个新需求,用户发布了动态,前台需要查询,为了用户读取信息响应速度更快(MySQL很难实现或者说实现起来很慢),所以在用户动态发布成功后,利用消息机制异步构建 redis缓存 和 elasticsearch索引 . 开发环境 rabbitMQ服务端,docker安装 拉取rabbit-mq镜像 docker pull hub.c.163.com/library/rabbitmq:3.6.10-management 运行镜像 docker run -d --name rabbitmq --p

消息队列 RabbitMQ 入门介绍

来源:http://ityen.com/archives/578 一.什么是RabbitMQ? RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:   例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在

RabbitMQ入门教程(十一):消息属性Properties

原文:RabbitMQ入门教程(十一):消息属性Properties 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78698364 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 发送消息可以为消息指定一些参数 Delivery mode: 是否持久化,1 - Non-persistent,2 -

RabbitMQ入门(一)

RabbitMQ 一. Windows下安装RabbitMQ及常用命令 RabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.它遵循Mozilla Public License开源协议,采用Erlang实现的工业级消息队列服务器,RabbitMQ是建立在Erlang OTP平台上. 安装Erlang在安装RabbitMQ之前,需要先安装Erlang ,安装地址http://www.erlang.org/download.html.安装成功后配置环境变量ERLANG_HOME

RabbitMQ入门教程(十四):RabbitMQ单机集群搭建

原文:RabbitMQ入门教程(十四):RabbitMQ单机集群搭建 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78723467 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 集群简介 理解集群先理解一下元数据 队列元数据:队列的名称和声明队列时设置的属性(是否持久化.是否自动删除.队列所属的节点)

RabbitMQ入门与使用篇

介绍 RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一. RabbitMQ的官方 概念: Brocker:消息队列服务器实体. Exchange:消息交换机,指定消息按什么规则,路由到哪个队列. Queue:消息队列,每个消息都会被投入到一个或者多个队列里. Binding:绑定,它的作用是把exchange和queue按

rabbitmq 入门基础(一)

第一章:Rabbitmq简单介绍 简单介绍: Rabbitmq是一个消息中间件.主要用于消息的转发和接收.假设把rabbitmq比作邮局:仅仅要你将信件投递到邮箱,你就能够确信邮递员将能够把你的信件递送到目的地. Rabbitmq的功能就相当于邮箱.邮局.邮递员的所代表的功能. 邮局递送的是纸质信件,Rabbitmq所不同之处就是其功能是接收.存储和转发的是二进制大对象数据.即Messages. Rabbitmq相关术语: (1)      Producing(生产):生产相当于消息发送(sen

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心