Mina、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

消息传递有很多种方式,请求/响应(Request/Reply)是最常用的。在前面的博文的例子中,很多都是采用请求/响应的方式,当服务器接收到消息后,会立即write回写一条消息到客户端。HTTP协议也是基于请求/响应的方式。

但是请求/响应并不能满足所有的消息传递的需求,有些需求可能需要服务端主动推送消息到客户端,而不是被动的等待请求后再给出响应。

发布/订阅(Publish/Subscribe)是一种服务器主动发送消息到客户端的消息传递方式。订阅者Subscriber连接到服务器客户端后,相当于开始订阅发布者Publisher发布的消息,当发布者发布了一条消息后,所有订阅者都会接收到这条消息。

网络聊天室一般就是基于发布/订阅模式来实现。例如加入一个QQ群,就相当于订阅了这个群的所有消息,当有新的消息,服务器会主动将消息发送给所有的客户端。只不过聊天室里的所有人既是发布者又是订阅者。

下面分别用MINA、Netty、Twisted分别实现简单的发布/订阅模式的服务器程序,连接到服务器的所有客户端都是订阅者,当发布者发布一条消息后,服务器会将消息转发给所有客户端。

MINA:

在MINA中,通过IoService的getManagedSessions()方法可以获取这个IoService当前管理的所有IoSession,即所有连接到服务器的客户端集合。当服务器接收到发布者发布的消息后,可以通过IoService的getManagedSessions()方法获取到所有客户端对应的IoSession并将消息发送到这些客户端。

public class TcpServer {

    public static void main(String[] args) throws IOException {
        IoAcceptor acceptor = new NioSocketAcceptor();

        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n")));

        acceptor.setHandler(new TcpServerHandle());
        acceptor.bind(new InetSocketAddress(8080));
    }

}

class TcpServerHandle extends IoHandlerAdapter {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        cause.printStackTrace();
    }

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {

        // 获取所有正在连接的IoSession
        Collection<IoSession> sessions = session.getService().getManagedSessions().values();

        // 将消息写到所有IoSession
        IoUtil.broadcast(message, sessions);
    }
}

Netty:

Netty提供了ChannelGroup来用于保存Channel组,ChannelGroup是一个线程安全的Channel集合,它提供了一些列Channel批量操作。当一个TCP连接关闭后,对应的Channel会自动从ChannelGroup移除,所以不用手动去移除关闭的Channel。

Netty文档关于ChannelGroup的解释:

A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don‘t need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.

当有新的客户端连接到服务器,将对应的Channel加入到一个ChannelGroup中,当发布者发布消息时,服务器可以将消息通过ChannelGroup写入到所有客户端。

public class TcpServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LineBasedFrameDecoder(80));
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new TcpServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(8080).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

    // ChannelGroup用于保存所有连接的客户端,注意要用static来保证只有一个ChannelGroup实例,否则每new一个TcpServerHandler都会创建一个ChannelGroup
    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        channels.add(ctx.channel()); // 将新的连接加入到ChannelGroup,当连接断开ChannelGroup会自动移除对应的Channel
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        channels.writeAndFlush(msg + "\r\n"); // 接收到消息后,将消息发送到ChannelGroup中的所有客户端
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // cause.printStackTrace();  暂时把异常打印注释掉,因为PublishClient发布一条消息后会立即断开连接,而服务器也会向PublishClient发送消息,所以会抛出异常
        ctx.close();
    }
}

Twisted:

在Twisted中,全局的数据一般会放在Factory,而每个连接相关的数据会放在Protocol中。所以这里可以在Factory中加入一个属性,来存放Protocol集合,表示所有连接服务器的客户端。当有新的客户端连接到服务器时,将对应的Protocol实例放入集合,当连接断开,将对应的Protocol从集合中移除。当服务器接收到发布者发布的消息后,遍历所有客户端并发送消息。

# -*- coding:utf-8 –*-

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(LineOnlyReceiver): 

    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.clients.add(self) # 新连接添加连接对应的Protocol实例到clients

    def connectionLost(self, reason):
        self.factory.clients.remove(self) # 连接断开移除连接对应的Protocol实例

    def lineReceived(self, line):
        # 遍历所有的连接,发送数据
        for c in self.factory.clients:
            c.sendLine(line)

class TcpServerFactory(Factory):
    def __init__(self):
        self.clients = set() # set集合用于保存所有连接到服务器的客户端

    def buildProtocol(self, addr):
        return TcpServerHandle(self)

reactor.listenTCP(8080, TcpServerFactory())
reactor.run()

下面分别是两个客户端程序,一个是用于发布消息的客户端,一个是订阅消息的客户端。

发布消息的客户端很简单,就是向服务器write一条消息即可:

public class PublishClient {

    public static void main(String[] args) throws IOException {

        Socket socket = null;
        OutputStream out = null;

        try {

            socket = new Socket("localhost", 8080);
            out = socket.getOutputStream();
            out.write("Hello\r\n".getBytes()); // 发布信息到服务器
            out.flush();

        } finally {
            // 关闭连接
            out.close();
            socket.close();
        }
    }
}

订阅消息的客户端连接到服务器后,会阻塞等待接收服务器发送的发布消息:

public class SubscribeClient {

    public static void main(String[] args) throws IOException {

        Socket socket = null;
        BufferedReader in = null;

        try {

            socket = new Socket("localhost", 8080);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            while (true) {
                String line = in.readLine(); // 阻塞等待服务器发布的消息
                System.out.println(line);
            }

        } finally {
            // 关闭连接
            in.close();
            socket.close();
        }
    }
}

分别针对MINA、Netty、Twisted服务器进行测试:

1、测试时首先开启服务器;
2、然后再运行订阅消息的客户端SubscribeClient,SubscribeClient可以开启多个;
3、最后运行发布消息的客户端PublishClient,可以多次运行查看所有SubscribeClient的输出结果。

运行结果可以发现,当运行发布消息的客户端PublishClient发布一条消息到服务器时,服务器会主动将这条消息转发给所有的TCP连接,所有的订阅消息的客户端SubscribeClient都会接收到这条消息并打印出来。

时间: 2024-10-22 23:53:15

Mina、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)的相关文章

RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)

原文:RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe) 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78628659 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示交换机的广播类型fanout,广播类型不需要routingKey,交换机会将所有

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者

Redis 命令参考——PubSub(发布订阅)

PubSub(发布订阅)PUBLISH PUBLISH channel message 将信息 message 发送到指定的频道 channel . 可用版本: >=2.0.0 时间复杂度: O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量. 返回值: 接收到信息 message 的订阅者数量. # 对没有订阅者的频道发送信息 redis>publish bad_channel "can a

【Redis发布订阅】

Redis通过PUBLISH.SUBSCRIBE等命令实现发布与订阅模式. 举例:QQ群的公告,单个发布者,多个收听着. *** 发布/订阅 PUBLISH 频道 消息 将消息发布到指定的频道. . SUBSCRIBE 频道1 [频道2] [...] 可同时订阅多个频道. . UNSUBSCRIBE 频道1 [频道2] 取消订阅指定的频道,如果不指定频道则会取消订阅所有频道. . PSUBSCRIBE [匹配模式1] [匹配模式2] 订阅一个或多个符合给定模式的频道,每个模式以"*"作

【Redis发布订阅】 -- 2019-08-11 19:47:44

原文: http://106.13.73.98/__/98/ Redis通过PUBLISH.SUBSCRIBE等命令实现发布与订阅模式. 举例:QQ群的公告,单个发布者,多个收听着. *** 发布/订阅 PUBLISH 频道 消息 将消息发布到指定的频道. . SUBSCRIBE 频道1 [频道2] [...] 可同时订阅多个频道. . UNSUBSCRIBE 频道1 [频道2] 取消订阅指定的频道,如果不指定频道则会取消订阅所有频道. . PSUBSCRIBE [匹配模式1] [匹配模式2]

redis事务以及发布订阅

1. 什么是redis的事务? ???????? redis事务就是一个命令执行的队列,将一系列预定义命令包装成一个整体,就是一个队列.当执行的时候,一次性按照添加顺序依次执行,中间不会被打断或者干扰. 2. 能干嘛? ???????? 一个队列中,一次性,顺序性,排他性的执行一系列命令 3. redis事务基本操作 开启事务:multi 设置事务的开始位置,这个指令开启后,后面所有的指令都会加入事务中 执行事务: exec 设置事务的结束位置,同时执行事务,与multi成对出现,成对使用 取消

angular的发布订阅

import subject from "nxjs/subject"; let sub = new subject(); //发布订阅 sub.subscribe(data=>{ console.log(data> }); //执行订阅的事件 sub.next(data); 原文地址:https://www.cnblogs.com/llcMite/p/12495867.html

Mina、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)

消息传递有非常多种方式.请求/响应(Request/Reply)是最经常使用的.在前面的博文的样例中.非常多都是採用请求/响应的方式.当server接收到消息后,会马上write回写一条消息到client. HTTP协议也是基于请求/响应的方式. 可是请求/响应并不能满足全部的消息传递的需求,有些需求可能须要服务端主动推送消息到client,而不是被动的等待请求后再给出响应. 公布/订阅(Publish/Subscribe)是一种server主动发送消息到client的消息传递方式.订阅者Sub

Mina、Netty、Twisted一起学(五):整合protobuf

protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化.反序列化),一般应用于网络传输,可支持多种编程语言. protobuf怎样使用这里不再介绍,本文主要介绍在MINA.Netty.Twisted中怎样使用protobuf,不了解protobuf的同学能够去參考我的还有一篇博文. 在前面的一篇博文中.有介绍到一种用一个固定为4字节的前缀Header来指定Body的字节数的一种消息切割方式.在这里相同要使用到. 仅仅是当中Body的内容不再是字