Mina、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)

消息传递有非常多种方式。请求/响应(Request/Reply)是最经常使用的。在前面的博文的样例中。非常多都是採用请求/响应的方式。当server接收到消息后,会马上write回写一条消息到client。

HTTP协议也是基于请求/响应的方式。

可是请求/响应并不能满足全部的消息传递的需求,有些需求可能须要服务端主动推送消息到client,而不是被动的等待请求后再给出响应。

公布/订阅(Publish/Subscribe)是一种server主动发送消息到client的消息传递方式。订阅者Subscriber连接到serverclient后,相当于開始订阅公布者Publisher公布的消息,当公布者公布了一条消息后,全部订阅者都会接收到这条消息。

网络聊天室一般就是基于公布/订阅模式来实现。

比如增加一个QQ群。就相当于订阅了这个群的全部消息。当有新的消息,server会主动将消息发送给全部的client。仅仅只是聊天室里的全部人既是公布者又是订阅者。

以下分别用MINA、Netty、Twisted分别实现简单的公布/订阅模式的server程序,连接到server的全部client都是订阅者,当公布者公布一条消息后,server会将消息转发给全部client。

MINA:

在MINA中。通过IoService的getManagedSessions()方法能够获取这个IoService当前管理的全部IoSession,即全部连接到server的client集合。当server接收到公布者公布的消息后,能够通过IoService的getManagedSessions()方法获取到全部client相应的IoSession并将消息发送到这些client。

public class TcpServer {

	public static void main(String[] args) throws IOException {
		IoAcceptor acceptor = new NioSocketAcceptor();

		acceptor.getFilterChain().addLast("codec",
				new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n")));

		acceptor.setHandler(new TcpServerHandle());
		acceptor.bind(new InetSocketAddress(8080));
	}

}

class TcpServerHandle extends IoHandlerAdapter {

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		cause.printStackTrace();
	}

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {

		// 获取全部正在连接的IoSession
		Collection<IoSession> sessions = session.getService().getManagedSessions().values();

		// 将消息写到全部IoSession
		IoUtil.broadcast(message, sessions);
	}
}

Netty:

Netty提供了ChannelGroup来用于保存Channel组,ChannelGroup是一个线程安全的Channel集合,它提供了一些列Channel批量操作。

当一个TCP连接关闭后,相应的Channel会自己主动从ChannelGroup移除。所以不用手动去移除关闭的Channel。

Netty文档关于ChannelGroup的解释:

A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don‘t need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.

当有新的client连接到server,将相应的Channel增加到一个ChannelGroup中。当公布者公布消息时,server能够将消息通过ChannelGroup写入到全部client。

public class TcpServer {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();
							pipeline.addLast(new LineBasedFrameDecoder(80));
							pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
							pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
							pipeline.addLast(new TcpServerHandler());
						}
					});
			ChannelFuture f = b.bind(8080).sync();
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}
}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

	// ChannelGroup用于保存全部连接的client。注意要用static来保证仅仅有一个ChannelGroup实例。否则每new一个TcpServerHandler都会创建一个ChannelGroup
	private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		channels.add(ctx.channel()); // 将新的连接增加到ChannelGroup。当连接断开ChannelGroup会自己主动移除相应的Channel
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		channels.writeAndFlush(msg + "\r\n"); // 接收到消息后。将消息发送到ChannelGroup中的全部client
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		// cause.printStackTrace();  临时把异常打印凝视掉,由于PublishClient公布一条消息后会马上断开连接。而server也会向PublishClient发送消息,所以会抛出异常
		ctx.close();
	}
}

Twisted:

在Twisted中,全局的数据通常会放在Factory,而每一个连接相关的数据会放在Protocol中。所以这里能够在Factory中增加一个属性,来存放Protocol集合,表示全部连接server的client。当有新的client连接到server时。将相应的Protocol实例放入集合。当连接断开,将相应的Protocol从集合中移除。当server接收到公布者公布的消息后,遍历全部client并发送消息。

# -*- coding:utf-8 –*-

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(LineOnlyReceiver): 

    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.clients.add(self) # 新连接增加连接相应的Protocol实例到clients

    def connectionLost(self, reason):
        self.factory.clients.remove(self) # 连接断开移除连接相应的Protocol实例

    def lineReceived(self, line):
        # 遍历全部的连接。发送数据
        for c in self.factory.clients:
            c.sendLine(line)

class TcpServerFactory(Factory):
    def __init__(self):
        self.clients = set() # set集合用于保存全部连接到server的client

    def buildProtocol(self, addr):
        return TcpServerHandle(self)

reactor.listenTCP(8080, TcpServerFactory())
reactor.run()

以下各自是两个client程序,一个是用于公布消息的client。一个是订阅消息的client。

公布消息的client非常easy,就是向serverwrite一条消息就可以:

public class PublishClient {

	public static void main(String[] args) throws IOException {

		Socket socket = null;
		OutputStream out = null;

		try {

			socket = new Socket("localhost", 8080);
			out = socket.getOutputStream();
			out.write("Hello\r\n".getBytes()); // 公布信息到server
			out.flush();

		} finally {
			// 关闭连接
			out.close();
			socket.close();
		}
	}
}

订阅消息的client连接到server后,会堵塞等待接收server发送的公布消息:

public class SubscribeClient {

	public static void main(String[] args) throws IOException {

		Socket socket = null;
		BufferedReader in = null;

		try {

			socket = new Socket("localhost", 8080);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

			while (true) {
				String line = in.readLine(); // 堵塞等待server公布的消息
				System.out.println(line);
			}

		} finally {
			// 关闭连接
			in.close();
			socket.close();
		}
	}
}

分别针对MINA、Netty、Twistedserver进行測试:

1、測试时首先开启server;
2、然后再执行订阅消息的clientSubscribeClient,SubscribeClient能够开启多个;
3、最后执行公布消息的clientPublishClient。能够多次执行查看全部SubscribeClient的输出结果。

执行结果能够发现,当执行公布消息的clientPublishClient公布一条消息到server时。server会主动将这条消息转发给全部的TCP连接,全部的订阅消息的clientSubscribeClient都会接收到这条消息并打印出来。

作者:叉叉哥   转载请注明出处:http://blog.csdn.net/xiao__gui/article/details/39396789

MINA、Netty、Twisted一起学系列

MINA、Netty、Twisted一起学(一):实现简单的TCPserver

MINA、Netty、Twisted一起学(二):TCP消息边界问题及按行切割消息

MINA、Netty、Twisted一起学(三):TCP消息固定大小的前缀(Header)

MINA、Netty、Twisted一起学(四):定制自己的协议

MINA、Netty、Twisted一起学(五):整合protobuf

MINA、Netty、Twisted一起学(六):session

MINA、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)

MINA、Netty、Twisted一起学(八):HTTPserver

MINA、Netty、Twisted一起学(九):异步IO和回调函数

MINA、Netty、Twisted一起学(十):线程模型

MINA、Netty、Twisted一起学(十一):SSL/TLS

MINA、Netty、Twisted一起学(十二):HTTPS

源代码

https://github.com/wucao/mina-netty-twisted

时间: 2025-01-10 09:10:39

Mina、Netty、Twisted一起学(七):公布/订阅(Publish/Subscribe)的相关文章

RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)

原文:RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe) 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78628659 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示交换机的广播类型fanout,广播类型不需要routingKey,交换机会将所有

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者

Mina、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

消息传递有很多种方式,请求/响应(Request/Reply)是最常用的.在前面的博文的例子中,很多都是采用请求/响应的方式,当服务器接收到消息后,会立即write回写一条消息到客户端.HTTP协议也是基于请求/响应的方式. 但是请求/响应并不能满足所有的消息传递的需求,有些需求可能需要服务端主动推送消息到客户端,而不是被动的等待请求后再给出响应. 发布/订阅(Publish/Subscribe)是一种服务器主动发送消息到客户端的消息传递方式.订阅者Subscriber连接到服务器客户端后,相当

Mina、Netty、Twisted一起学(五):整合protobuf

protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化.反序列化),一般应用于网络传输,可支持多种编程语言. protobuf怎样使用这里不再介绍,本文主要介绍在MINA.Netty.Twisted中怎样使用protobuf,不了解protobuf的同学能够去參考我的还有一篇博文. 在前面的一篇博文中.有介绍到一种用一个固定为4字节的前缀Header来指定Body的字节数的一种消息切割方式.在这里相同要使用到. 仅仅是当中Body的内容不再是字

Mina、Netty、Twisted一起学(十):线程模型

要想开发一个高性能的TCPserver,熟悉所使用框架的线程模型非常重要.MINA.Netty.Twisted本身都是高性能的网络框架,假设再搭配上高效率的代码.才干实现一个高大上的server. 可是假设不了解它们的线程模型.就非常难写出高性能的代码.框架本身效率再高.程序写的太差,那么server总体的性能也不会太高.就像一个电脑,CPU再好.内存小硬盘慢散热差,总体的性能也不会太高. 玩过Android开发的同学会知道,在Android应用中有一个非常重要线程:UI线程(即主线程). UI

Mina、Netty、Twisted一起学:实现简单的TCP服务器

MINA.Netty.Twisted为什么放在一起学习?首先,不妨先看一下他们官方网站对其的介绍: MINA: Apache MINA is a network application framework which helps users develop high performance and high scalability network applications easily. It provides an abstract event-driven asynchronous API

分布式公布订阅消息系统 Kafka 架构设计

我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础. 如今它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是全部站点在对其站点使用情况做报表时要用到的数据中最常规的部分.活动数据包含页面訪问量(page view).被查看内容方面的信息以及搜索情况等内容.这样的数据通常的处理方式是先把各种活动以日志的形式写

Shuttle ESB(四)——公布订阅模式实例介绍(1)

前面,我已经集中用了三篇文章来讲Shuttle ESB的入门实例与宏观概念. Shuttle ESB一共同拥有两种发送消息的模式:请求/对应模式与Pub/Sub模式. 关于这两种模式的区分.请看以下文章的介绍:Shuttle ESB(三)--架构模型介绍(2) 在Shuttle ESB的第一篇文章中,关于入门实例的介绍,是基于Command命令的请求响应模式.这样的模式发送的消息比較简单.是同步的. 发送消息端与接收消息端的行为耦合性比較大. 请求发送后,其它进程都会处于等待状态.等待服务端返回

NATS学习 -- 概念学习之消息(Message)与发布订阅(Publish Subscribe)

1 理论篇 1.1 来自官方的介绍 NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always o