rabbitMq及安装、fanout交换机-分发(发布/订阅)

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
 </dependency>

最常见的几种消息通信模式主要有发布-订阅、点对点这两种
http://blog.csdn.net/woogeyu/article/details/51119101 集群
http://blog.csdn.net/column/details/rabbitmq.html Python版
http://www.cnblogs.com/LipeiNet/category/896408.html
http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html  api
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现
其他公开标准(如 COBAR的 IIOP ,或者是 SOAP 等)

从Erlang的官网 http://www.erlang.org/download.html 下载最新的erlang安装包,Linux和MacOSX下载的版本是 http://www.erlang.org/download.html
1、安装依赖
# yum install build-essential m4  
# yum install openssl  
# yum install openssl-devel  
# yum install unixODBC  
# yum install unixODBC-devel  
# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
# yum install perl

2、配置并安装erlang
# tar -zxvf otp_src_20.1.tar  
# cd otp_src_20.1
# ./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll
./configure
--prefix=/usr/local/erlang --with-ssl --enable-threads
--enable-smp-support --enable-kernel-poll --enable-hipe --without-javac  
./configure --prefix=/usr/local/erlang --enable-all (选这个)
make
make install

3、设置erlang环境变量
打开/etc/profile设置环境变量 查看PATH:echo $PATH 环境变量
ERL_HOME=/usr/local/erlang  
PATH=$ERL_HOME/bin:$PATH  
export ERL_HOME PATH

4、安装mq
http://www.rabbitmq.com/install-rpm.html
http://www.rabbitmq.com/releases/rabbitmq-server/ 下载rpm包 
rabbitmq-server-3.6.14-1.el6.noarch
yum install -y socat(不需要)
ln -s /usr/local/erlang/bin/erl      /usr/bin/erl 建立软连接 (不建可能会报错)
rpm -i --nodeps rabbitmq-server-3.6.5-1.noarch
执行结果 warning:rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY

5、起停mq
/sbin/service rabbitmq-server stop/start/etc.  
[[email protected] ~]#service rabbitmq-server start 启动服务  
[[email protected] ~]#service rabbitmq-server etc   查看哪些命令可以使用  
[[email protected] ~]#service rabbitmq-server stop  停止服务  
[[email protected] ~]#service rabbitmq-server status 查看服务状态
启动监控管理器:rabbitmq-plugins enable rabbitmq_management
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
查看所有的队列:rabbitmqctl list_queues
清除所有的队列:rabbitmqctl reset
关闭应用:rabbitmqctl stop_app 不同于停止服务
启动应用:rabbitmqctl start_app

6、卸载等命令
#rpm -qa|grep rabbitmq
rabbitmq-server-3.6.5-1.noarch
卸载 mq
#rpm -e --nodeps rabbitmq-server-3.6.5-1.noarch
#rpm -qa|grep erlang
esl-erlang-18.3-1.x86_64
#rpm -e --nodeps esl-erlang-18.3-1.x86_64

7、访问后台
关闭防火墙 service iptables stop
安装启动后其实还不能在其它机器访问, rabbitmq默认的 guest 账号只能在本地机器访问, 如果想在其它机器访问必须配置其它账号
配置管理员账号:
    rabbitmqctl add_user admin adminpasspord
    rabbitmqctl set_user_tags admin administrator
启动rabbitmq内置web插件, 管理rabbitmq账号等信息
    rabbitmq-plugins enable rabbitmq_management
访问 http://192.168.89.131:15672/#/users  为刚建的账号 set permission
http://127.0.0.1:15672

(1)添加用户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
(2) 删除一个用户
rabbitmqctl  delete_user  Username
(3) 修改用户的密码
rabbitmqctl  change_password  Username  Newpassword
(4) 查看当前用户列表
rabbitmqctl  list_users

4、设置开机启动
chkconfig rabbitmq-server on

5、java客户端amqp-client版本号为3.6.5 与 rabbitmq-server-3.6.5-1.noarch服务版本号必须匹配

8、命令
http://www.linuxidc.com/Linux/2016-10/136493.htm

1、信息分发
向指定队列发送多条信息,多个消费者来获取该队列中信息
channel.basicQos(1);保证一次只分发一个,否则可能有些消费者获取较多消息有的消费者获取不到消息
可以使消费者采用手动答复,保证在一个消费者处理消息失败后(此时不答复)其他消费者还能继续获取并处理该消息
2、交换机
rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中
fanout,表示分发,所有的消费者得到同样的队列信息,发布/订阅,channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
direct,发送信息到不同的路由,消费者根据不同路由获取不同信息
topics,发送信息到不同的路由,消费者根据模糊匹配获取某些路由的信息

1,单任务消息
生产者:根据一定的 QUEUE_NAME 生产单个消息
消费者:根据 QUEUE_NAME 获取消息
2,多任务分发
生产fenfa()  消费fenfa1()fenfa2()
生产者:根据一定的 QUEUE_NAME 生产多个消息
消费者:根据 QUEUE_NAME 获取消息(channel.basicQos(1)一次只分发一个,通过手动回复使多个消费者公平的获取和处理消息)
3,交换机
生产exchange()  消费exchange1()exchange2()
生产者:根据一定的 EXCHANGE_NAME,生产多个消息(fanout表示分发,所有的消费者得到同样的信息)
消费者:根据 EXCHANGE_NAME,每一个交换机都会获取一遍消息

service

package com.xmh.mq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;

public class ServerTest {

    private static final String EXCHANGE_NAME = "logs";
    public final static String QUEUE_NAME="rabbitMQ.test1";

    public static void main(String[] args) throws Exception{

    }

    public static void exchange(){
        try{
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.89.131");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout表示分发,所有的消费者得到同样的队列信息
            //分发信息
            for (int i=0;i<5;i++){
                String message="Hello World"+i;
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
                System.out.println("EmitLog Sent ‘" + message + "‘");
            }
            channel.close();
            connection.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 多任务分发,两个以上客户端处理消息
     */
    public static void fenfa(){
        try{
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.89.131");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,true,false,true,null);
            //分发信息
            for (int i=0;i<10;i++){
                String message="Hello RabbitMQ"+i;
                channel.basicPublish("",QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                System.out.println("NewTask send ‘"+message+"‘");
            }
            channel.close();
            connection.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    /**
     * 注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、
     * 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
              注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
             声明队列后mq后台可看到该队列及存放的消息
     */
    public static void quene(){
        try{
         //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ相关信息
        factory.setHost("192.168.89.131");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
//        factory.setVirtualHost("/");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //  声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        String message = "Hello RabbitMQ-1";
        //发送消息到队列中
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("Producer Sendss +‘" + message + "‘");
        //关闭通道和连接
        channel.close();
        connection.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

}

注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体

package com.xmh.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;

public class ConsumerTest {

     private final static String QUEUE_NAME = "rabbitMQ.test1";
     private static final String EXCHANGE_NAME = "logs";

        public static void main(String[] args) throws IOException, TimeoutException {
            test_exchange();
            test_exchange2();
        }

        /**
         * 必须先订阅(先启动客户端,才能收到服务端消息) 发布/订阅
         * rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息,
         * 然后按照规则把它推送到对列中,交换机发布/订阅  Fanout扇形交换机 其他有(Direct直连交换机、Topic主题交换机)
         */
        public static void exchange1(){
            try{
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();

                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

                //产生一个随机的队列名称
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定

                System.out.println("ReceiveLogs1 Waiting for messages");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        System.out.println("ReceiveLogs1 Received ‘" + message + "‘");
                    }
                };
                channel.basicConsume(queueName, true, consumer);//队列会自动删除
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        public static void exchange2(){
            try{
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();

                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

                //产生一个随机的队列名称
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定

                System.out.println("ReceiveLogs2 Waiting for messages");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        System.out.println("ReceiveLogs2 Received ‘" + message + "‘");
                    }
                };
                channel.basicConsume(queueName, true, consumer);//队列会自动删除
            }catch(Exception e){
                e.printStackTrace();
            }
        }

        /**
         * 对于分发信息的处理
         * 注:channel.basicQos(1);保证一次只分发一个 。autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,
         * 那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出,
         * 如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。
         */
        public static void fenfa1(){
            try{
                final ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                final Channel channel = connection.createChannel();

                channel.queueDeclare(QUEUE_NAME, true, false, true, null);
                System.out.println("Worker1  Waiting for messages");

                //每次从队列获取的数量
                channel.basicQos(1);

                final Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        //envelope - packaging data for the message  body - the message body (opaque, client-specific byte array)
                        String message = new String(body, "UTF-8");
                        System.out.println("Worker1  Received ‘" + message + "‘");
                        try {
                            doWork(message);
                            channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages.  .getDeliveryTag() 消息传递标记
                            System.out.println("Worker1 Done");
                        }catch (Exception e){
                            channel.abort();//终止渠道
                        }finally {
                        }
                    }
                };
                boolean autoAck=false;//手动回复
                //消息消费完成确认
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);

                } catch (Exception e){
                    e.printStackTrace();
                }
        }
        //对于分发信息的处理
        public static void fenfa2(){
            try{
                final ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection = factory.newConnection();
                final Channel channel = connection.createChannel();

                channel.queueDeclare(QUEUE_NAME, true, false, true, null);
                System.out.println("Worker2  Waiting for messages");

                //每次从队列获取的数量
                channel.basicQos(1);

                final Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body) throws IOException {
                        //envelope - packaging data for the message  body - the message body (opaque, client-specific byte array)
                        String message = new String(body, "UTF-8");
                        System.out.println("Worker2  Received ‘" + message + "‘");
                        try {
                            doWork(message);
                            //Integer.parseInt("s");
                            channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages.  .getDeliveryTag() 消息传递标记
                            System.out.println("Worker2 Done");
                        }catch (Exception e){
                            channel.abort();//终止渠道,另外一个客户端会继续获取消息
                            System.out.println("Worker2 客户端处理异常");
                        }finally {
                        }
                    }
                };
                boolean autoAck=false;
                //消息消费完成确认
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                } catch (Exception e){
                    e.printStackTrace();
                }
        }
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
        /**
         * 手动关掉客户端后,该队列会自动删除(声明队列时设置)
         */
        public static void test2(){
            try{
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ地址
            factory.setHost("192.168.89.131");// 192.168.89.131 :15672
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //声明要关注的队列
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            System.out.println("Customer Waiting Received messages");
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery,持续监听
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Customer Received ‘" + message + "‘");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(QUEUE_NAME, true, consumer);
            } catch (Exception e){
                e.printStackTrace();
            }
        }
}

原文地址:https://www.cnblogs.com/xingminghui/p/8650394.html

时间: 2024-11-08 03:06:46

rabbitMq及安装、fanout交换机-分发(发布/订阅)的相关文章

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者

rabbitMQ交换机的发布订阅模式

生产者: # !/usr/bin/env python # -*- coding: utf-8 -*- import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 创建交换机 channel.exchange_declare(exchange='logs', exchange_type=

RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)

原文:RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe) 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78628659 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示交换机的广播类型fanout,广播类型不需要routingKey,交换机会将所有

RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

(本教程是使用Net客户端,也就是针对微软技术平台的) 在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅". 为了说明这种模式,我们将构建一个简单的日志系统.它将包括两个程序,第一个将发出日志消息,第二个将接收并打印它们. 在我们的日志系统中每个接收程序的运行副本都会得到消息.这样我们就可以运行一个接收者程序,将日志记录到磁盘:同时我们可以运行另

rabbitmq系列三 之发布/订阅

1.发布/订阅 在上篇教程中,我们搭建了一个工作队列,每个任务只分发给一个工作者(worker).在本篇教程中,我们要做的跟之前完全不一样 -- 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容. 在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息.我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受

RabbitMQ发布订阅模式

这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展.功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者. 思路解读(重点理解): (1)一个生产者,多个消费者(2)每一个消费者都有自己的一个队列(3)生产者没有直接发消息到队列中,而是发送到交换机(4)每个消费者的队列都绑定到交换机上(5)消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列以用户发

RabbitMQ教程C#版 - 发布订阅

先决条件本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们. 发布/订阅# (使用 .NET Client) 在 教程[2] 中,我们创建了一个工作队列,假设在工作队列中的每一个任务都只被分发给一个 Worker.那么在这一章节,我们要做与之完全不同的事,那就是我们将要把一条消息分发给多个消费者.这种模式被称为“发布/订阅”. 为了说

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播