Netty利用ChannelGroup广播消息

在Netty中提供了ChannelGroup接口,该接口继承Set接口,因此可以通过ChannelGroup可管理服务器端所有的连接的Channel,然后对所有的连接Channel广播消息。

Server端:

public class BroadCastServer {

    public static void run(int port) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                     .channel(NioServerSocketChannel.class)                // 设置Channel Type
                     .option(ChannelOption.SO_BACKLOG, 1024)            // 设置Channel属性
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                            pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new BroadCastChannelHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            if (channelFuture.isDone()) {
                System.out.println(String.format("server bind port %s sucess", port));
            }
            Channel channel = channelFuture.channel();
            /**CloseFuture异步方式关闭*/
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String []args) {
        BroadCastServer.run(8080);
    }

}

public class BroadCastChannelHandler extends ChannelInboundHandlerAdapter {

    private static final Gson GSON = new GsonBuilder().create();

    private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private static final AtomicInteger response = new AtomicInteger();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel ch = ctx.channel();
        if (ChannelGroups.size() > 0) {
            Message msg = new Message(ch.remoteAddress().toString().substring(1), SDF.format(new Date()));
            ChannelGroups.broadcast(GSON.toJson(msg), new ChannelMatchers());
        }
        ChannelGroups.add(ch);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (ChannelGroups.contains(ch) && String.valueOf(msg).equals("welcome")) {
            System.out.println(String.format("receive [%s] from [%s] at [%s]", String.valueOf(msg) ,
                                    ch.remoteAddress().toString().substring(1), SDF.format(new Date())));
            response.incrementAndGet();
        }
        synchronized (response) {
            System.out.println(response.get() + "\t" + ChannelGroups.size());
            if (response.get() == ChannelGroups.size() - 1) {
                System.out.println("server close all connected channel");
                ChannelGroups.disconnect();
                response.set(0);
            }
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ChannelGroups.discard(ctx.channel());
        response.decrementAndGet();
    }

    public static class ChannelMatchers implements ChannelMatcher {

        @Override
        public boolean matches(Channel channel) {
            return true;
        }

    }

}

服务器端收到所有连接客户端对广播消息的响应后,服务器端主动关闭已连接的Channel

客户端:

public class Client	{

	private static final String host = "127.0.0.1";

	private static final int port = 8080;

	private static final ExecutorService es = Executors.newFixedThreadPool(5);

	public static void start() {
		for (int i = 0; i < 5; i++) {
			es.execute(new Task());
		}
		es.shutdown();
	}

	public static class Task implements Runnable {

		@Override
		public void run() {
			EventLoopGroup group = new NioEventLoopGroup();
			try {
				Bootstrap bootstrap = new Bootstrap();
				bootstrap.group(group)
						 .channel(NioSocketChannel.class)
						 .option(ChannelOption.TCP_NODELAY, true)
						 .handler(new ChannelInitializer<SocketChannel>() {

							@Override
							protected void initChannel(SocketChannel ch) throws Exception {
								ChannelPipeline pipeline = ch.pipeline();
								pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
								pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
								pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
								pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
								pipeline.addLast(new SimpleClientChannelHandler());
							}

						});
				ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
				if (channelFuture.isSuccess()) {
					System.out.println(String.format("connect server(%s:%s) sucess", host, port));
				}
				channelFuture.channel().closeFuture().sync();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				group.shutdownGracefully();
			}
		}

	}

	public static void main(String []args) {
		Client.start();
	}
}

public class SimpleClientChannelHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		Channel channel = ctx.channel();
		System.out.println(String.format("client(%s) receive message [%s]", channel.localAddress().toString().substring(1),
								String.valueOf(msg)));
		System.out.println();
		ctx.writeAndFlush(String.valueOf("welcome"));
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		ctx.disconnect(ctx.newPromise());
		ctx.close();
		System.out.println(String.format("client(%s) close sucess", ctx.channel().localAddress().toString().substring(1)));
	}
}

  本文使用ChannelGroups辅助类管理服务器端已连接的Channel,代码实现如下:

public class ChannelGroups {

    private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup("ChannelGroups", GlobalEventExecutor.INSTANCE);

    public static void add(Channel channel) {
        CHANNEL_GROUP.add(channel);
    }

    public static ChannelGroupFuture broadcast(Object msg) {
        return CHANNEL_GROUP.writeAndFlush(msg);
    }

    public static ChannelGroupFuture broadcast(Object msg, ChannelMatcher matcher) {
        return CHANNEL_GROUP.writeAndFlush(msg, matcher);
    }

    public static ChannelGroup flush() {
        return CHANNEL_GROUP.flush();
    }

    public static boolean discard(Channel channel) {
        return CHANNEL_GROUP.remove(channel);
    }

    public static ChannelGroupFuture disconnect() {
        return CHANNEL_GROUP.disconnect();
    }

    public static ChannelGroupFuture disconnect(ChannelMatcher matcher) {
        return CHANNEL_GROUP.disconnect(matcher);
    }

    public static boolean contains(Channel channel) {
        return CHANNEL_GROUP.contains(channel);
    }

    public static int size() {
        return CHANNEL_GROUP.size();
    }
}

  

时间: 2024-10-13 00:29:19

Netty利用ChannelGroup广播消息的相关文章

netty解决channel管理,可广播消息

在Netty中提供了ChannelGroup接口,该接口继承Set接口,因此可以通过ChannelGroup可管理服务器端所有的连接的Channel,然后对所有的连接Channel广播消息. Server端: public class BroadCastServer { public static void run(int port) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioE

node的socket.io的广播消息

在多个客户端与服务器端建立连接后,socket.io()服务器具有一个sockets属性,属性值为所有与客户端建立连接的socket对象.可以利用该对象的send方法或emit方法向所有客户端广播消息. io.sockets.send("user commected); io.socket.emit("login",names); 案例 server.js代码: 1 var express=require("express"); 2 var http=re

rabbitMQ应用,laravel生产广播消息,springboot消费消息

最近做一个新需求,用户发布了动态,前台需要查询,为了用户读取信息响应速度更快(MySQL很难实现或者说实现起来很慢),所以在用户动态发布成功后,利用消息机制异步构建 redis缓存 和 elasticsearch索引 . 开发环境 rabbitMQ服务端,docker安装 拉取rabbit-mq镜像 docker pull hub.c.163.com/library/rabbitmq:3.6.10-management 运行镜像 docker run -d --name rabbitmq --p

Remoting异步回调,向在线用户广播消息

本文目的:向Remoting在线客户端广播消息. 使用的主要技术:异步,回调,广播. 实现过程: 定义远程实例 using System; using System.Collections.Generic; using System.Text; using System.Runtime.Remoting.Messaging; namespace RemoteObject { //定义委托,显示回调消息 public delegate void ShowCallBackMsg(string str

Android中利用Handler实现消息的分发机制(三)

在第二篇文章<Android中利用Handler实现消息的分发机制(一)>中,我们讲到主线程的Looper是Android系统在启动App的时候,已经帮我们创建好了,而如果在子线程中需要去使用Handler的时候,我们就需要显式地去调用Looper的 prepare方法和loop方法,从而为子线程创建其唯一的Looper. 具体代码如下: class LooperThread extends Thread { public Handler mHandler; public void run()

android菜鸟学习笔记26----Android广播消息及BroadcastReceiver

1.广播类型: Android中的广播有两种类型:标准广播和有序广播.其中,标准广播是完全异步发送的广播,发出之后,几乎所有的广播接收者都会在同一时刻收到这条广播消息,因而,这种类型的广播消息是不可拦截,不可修改的:而有序广播是一种同步发送的广播,广播发出后,只有优先级最高的广播接收者能够收到这条广播消息,它处理完自己的逻辑之后,广播才会向后继续传递给低优先级的广播接收者,因此,高优先级的广播接收者可以对广播消息进行拦截,修改操作. 2.接收系统广播: 要接收系统广播,就要有自己的广播接收者.定

Android中利用Handler实现消息的分发机制(零)

在之前一篇介绍AsyncTask的文章中,我们在最后讲到,AsyncTask是利用Handler的消息异步处理机制,将操作结果,利用Message传回主线程,从而进行UI线程的更新的. 而在我们日常的开发工作中,Handler也是我们经常使用的类之一,那么Handler的主要作用是什么? Handler 的主要作用就是对消息(消息可以是我们想做的一些UI更新,也可以是其他的一些不可见的操作,如操作数据库等)的异步处理机制,而相信大家都了解异步的概念. 简单地说一下: 1)从程序的角度来看,就是当

利用System V消息队列实现回射客户/服务器

一.介绍 在学习UNIX网络编程 卷1时,我们当时可以利用Socket套接字来实现回射客户/服务器程序,但是Socket编程是存在一些不足的,例如: 1. 服务器必须启动之时,客户端才能连上服务端,并与服务端进行通信: 2. 利用套接口描述符进行通信,必须知道对端的IP与端口. 二.相关函数介绍 下面,我们利用System V消息队列来实现进程间的通信: 首先,我们先来了解一下下面几个函数: 1. msgget: 该函数用于打开或创建消息队列,其作用相当与文件操作函数open. #include

android 如何屏蔽接收来自某些信道的小区广播消息

客户发现4370~4382的小区广播信道被打开了,想屏蔽这些信道的小区广播消息,可以参考如下设置: 1. 在SmsCbConstants.java(alps/frameworks/opt/telephony/src/java/com/android/internal/telephony/gsm) 中添加宏: public static final int MESSAGE_ID_CBDD_IDENTIFIER_MIN = 0x1112; // add by mtk for 4370 public