Netty实战十三之使用UDP广播事件

1、UDP的基础知识

我们将会把重点放在一个无连接协议即用户数据报协议(UDP)上,它通常用在性能至关重要并且能够容忍一定的数据报丢失的情况下。

面向连接的传输(如TCP)管理了两个网络端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输,以及最后,连接的有序终止。相比之下,在类似于UDP这样的无连接协议中,并没有持久化连接这样的概念,并且每个消息(一个UDP数据报)都是一个单独的传输单元。

此外,UDP也没有TCP的纠错机制,其中每个节点都将确认它们所接收到的包,而没有被确认的包将会被发送方重新传输。

通过类比,TCP连接就像打电话,其中一系列的有序消息将会在两个方法上流动,相反,UDP则类似于往邮箱中投入一叠明信片。你无法知道它们将以何种顺序到达它们的目的地,或者它们是否所有的都能够到达它们的目的地。

UDP的这些方面可能会让你感觉到严重的局限性,但是它们也解释了为何它会比TCP快那么多:所有的握手以及消息管理机制的开销已经被消除了。显然,UDP很适合那些能够处理或者容忍消息丢失的应用程序,但可能不适合那些处理金融交易的应用程序。

2、UDP广播

到目前为止,我们所有的例子采用的都是一种叫做单播的传输模式,定义为发送消息给一个由唯一的地址所标识的单一的网络目的地。面向连接的协议和无连接协议都支持这种模式。

UDP提供了向多个接收者发送消息的额外传输模式:

多播——传输到一个预定义的主机组

广播——传输到网络(或者子网)上的所有主机

示例应用程序将通过发送能够被同一个网络中的所有主机所接收的消息来演示UDP广播的使用。为此,我们将使用特殊的受限广播地址或者零网络地址255.255.255.255.发送到这个地址的消息都将会被定向给本地网络(0.0.0.0)上的所有主机,而不会被路由器转发给其他的网络。

3、UDP示例应用程序

我们的示例程序将打开一个文件,随后将会通过UDP把每一行都作为一个消息广播到一个指定的端口。如果你熟悉类UNIX操作系统,你可能会认识到这是标准的syslog实用程序的一个非常简化的版本。UDP非常适合于这样的应用程序,因为考虑到日志文件本身已经被存储在了文件系统中,因此,偶尔丢失日志文件中的一两行是可以容忍的。此外,该应用程序还提供了极具有价值的高效处理大量数据的能力。

接收方是怎么样呢?通过UDP广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。需要注意的是,这样的轻松访问性也带来了潜在的安全隐患,这也就是为何在不安全的环境中并不倾向于使用UDP广播的原因之一。出于同样的原因,路由器通常也会阻止广播消息,并将它们限制在它们的来源网络上。

发布/订阅模式 : 类似于syslog这样的应用程序通常会被归类为发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。

下图展示了整个系统的一个高级别试图,其由一个广播者以及一个或者多个事件监听器所组成。广播者将监听新内容的出现,当它出现时,则通过UDP将它作为一个广播消息进行传输。 所有的该UDP端口上监听的事件监听器都将会接收到广播消息。

为了简单起见,我们将不会为我们的示例程序添加身份认证、验证或者加密。但是,要加入这些功能并使得其成为一个健壮的、可用的实用程序应该也不难。

4、消息POJO:LogEvent

在消息处理应用程序中,数据通常由POJO表示,除了实际上的消息内容,其还包含配置或处理消息。在这个应用程序中,我们将会把消息作为事件处理,并且由于该数据来自于日志文件,所以我们将它称为LogEvent。以下代码展示了这个简单的POJO的详细信息。

public final class LogEvent {
    public static final byte SEPARATOR = (byte) ‘:‘;
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;

    //用于传出消息的构造函数
    public LogEvent(String logfile, String msg) {
        this(null,-1,logfile,msg);
    }

    //用于传入消息的构造函数
    public LogEvent(InetSocketAddress source,long received, String logfile, String msg) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }

    public InetSocketAddress getSource() {
        return source;
    }

    public String getLogfile() {
        return logfile;
    }

    public String getMsg() {
        return msg;
    }

    public long getReceived() {
        return received;
    }
}

5、编写广播者

Netty提供了大量的类来支持UDP应用程序的编写。Netty的DatagramPacket是一个简单的消息容器,DatagramChannel实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

要将LogEvent消息转换为DatagramPacket,我们将需要一个编码器。但是没有必要从头开始编写我们自己的。我们将扩展Netty的MessageToMessageEncoder。

下图展示了正在广播的3个日志条目,每一个都将通过一个专门的DatagramPacket进行广播。 下图呈现了该LogEventBroadcaster的ChannelPipeline的一个高级别试图,展示了LogEvent消息是如何流经它的。 正如你所看到的,所有的将要被传输的数据都被封装在了LogEvent消息中。LogEventBroadcaster将把这些写入到Channel中,并通过ChannelPipeline发送它们,在那里它们将会被转码为DatagramPacket消息。最后,它们都将通过UDP被广播,并由远程节点所捕获。

以下代码展示了我们自定义版本的MessageToMessageEncoder,其将执行刚才所描述的转换。

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent>{
    private final InetSocketAddress remoteAddress;

    //LogEventEncoder创建了即将被发送到指定的InetSocketAddress的DatagramPacket消息
    public LogEventEncoder(InetSocketAddress remoteAddress){
        this.remoteAddress = remoteAddress;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
                          LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1);
        //将文件名写入到ByteBuf中
        buf.writeBytes(file);
        //添加一个SEPARATOR
        buf.writeByte(LogEvent.SEPARATOR);
        //将日志消息写入ByteBuf中
        buf.writeBytes(msg);
        //将一个拥有数据和目的地地址的新DatagramPacket添加到出站的消息列表中
        out.add(new io.netty.channel.socket.DatagramPacket(buf,remoteAddress));
    }
}

在LogEventEncoder被实现之后,我们已经准备好了引导该服务器,其包括设置各种各样的ChannelOption,以及在ChannelPipeline中安装所需要的ChannelHandler。这将通过主类LogEventBroadcaster完成。如下代码所示。

public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;

    public LogEventBroadcaster(InetSocketAddress address, File file){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        //引导该NioDatagramChannel(无连接)
        bootstrap.group(group).channel(NioDatagramChannel.class)
                //设置SO_BROADCAST套接字选项
                .option(ChannelOption.SO_BROADCAST,true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }

    public void run() throws Exception{
        //绑定Channel
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //启动主处理循环
        for (;;){
            long len = file.length();
            if (len < pointer){
                //file was reset
                //如果有必要,将文件指针设置到该文件的最后一个字符
                pointer = len;
            } else if (len > pointer){
                //Content was added
                RandomAccessFile raf = new RandomAccessFile(file,"r");
                //设置当前的文件指针,以确保没有任何的旧日志被发送
                raf.seek(pointer);
                String line;
                while((line = raf.readLine()) != null){
                    //对于每条日志条目。,写入一个LogEvent到Channel中
                    ch.writeAndFlush(new LogEvent(null,-1,file.getAbsolutePath(),line));
                }
                //存储其在文件中的当前位置
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                //休眠1秒,如果被中断,则退出循环,否则重新处理它
                Thread.sleep(1000);
            }catch (InterruptedException e){
                Thread.interrupted();
                break;
            }
        }
    }

    public void stop(){
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 2){
            throw new IllegalArgumentException();
        }

        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0])),new File(args[1]));
        try {
            broadcaster.run();
        }finally {
            broadcaster.stop();
        }
    }
}

6、编写监视器

目标是将netcat替换为一个更加完整的事件消费者,我们称之为LogEventMonitor。这个程序将:

(1)接收有LogEventBroadcaster广播的UDP DatagramPacket

(2)将它们解码为LogEvent消息

(3)将LogEvent消息写到System.out

和之前一样,该逻辑由一组自定义的ChannelHandler实现——对于我们的解码器来说,我们将扩展MessageToMessageDecoder。下图描绘LogEventMonitor的ChannelPipeline,并且展示了LogEvnet是如何流经它的。 ChannelPipeline中的第一个解码器LogEventDecoder负责传入的DatagramPacket解码为LogEvent消息(一个用于转换入站数据的任何Netty应用程序的典型设置)

public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket>{

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext,
                          DatagramPacket datagramPacket, List<Object> out) throws Exception {
        //获取对DatagramPacket中的数据的引用
        ByteBuf data = datagramPacket.content();
        //获取该SEPARATOR的索引
        int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
        //提取文件名
        String fileName = data.slice(0,idx).toString(CharsetUtil.UTF_8);
        //提取日志消息
        String logMsg = data.slice(idx + 1,data.readableBytes()).toString(CharsetUtil.UTF_8);
        //构建一个新的LogEvent对象,并且将它添加到列表中
        LogEvent event = new LogEvent(datagramPacket.sender(),System.currentTimeMillis(),fileName,logMsg);
        out.add(event);
    }
}

第二个ChannelHandler的工作是对第一个ChannelHandler所创建的LogEvent消息执行一些处理。在这个场景下,它只是简单地将它们写到System.out。在真实世界的应用程序中,你可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。

public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent>{

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //当异常发生时,打印栈跟踪信息,并关闭对应的Channel
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                LogEvent event) throws Exception {
        //创建StringBuilder,并且构建输出的字符串
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceived());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        //打印LogEvent的数据
        System.out.println(builder.toString());
    }
}

LogEventHandler将以一种简单易读的格式打印LogEvent消息,现在我们需要将我们的LogEventDecoder和LogEventHandler安装到ChannelPipeline中。

public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;

    public LogEventMonitor(InetSocketAddress address){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST,true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                }).localAddress(address);
    }

    public Channel bind(){
        return bootstrap.bind().syncUninterruptibly().channel();
    }

    public void stop(){
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1){
            throw new IllegalArgumentException("Usage:LoEventMonitor <port>");
        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        }finally {
            monitor.stop();
        }
    }

}



原文地址:https://www.cnblogs.com/UncleCatMySelf/p/9190638.html

时间: 2024-10-09 19:22:04

Netty实战十三之使用UDP广播事件的相关文章

【Netty】UDP广播事件

一.前言 前面学习了WebSocket协议,并且通过示例讲解了WebSocket的具体使用,接着学习如何使用无连接的UDP来广播事件. 二.UDP广播事件 2.1 UDP基础 面向连接的TCP协议管理端到端的连接,在连接生命周期中,发送的消息会有序并且可靠地进行传输,最后连接有序地终止.然而,在无连接协议(如UDP)中,没有持久连接的概念,每个消息(UDP数据报)都是独立的传输,此外,UDP没有TCP的纠错机制(即每个对等体会确认其接收到的分组,并且发送者会重传未确认的分组). UDP的限制比T

Netty in Action (二十四) 第十三章节 UDP的广播事件

本章内容包括: 1)UDP的总览 2)广播应用的一个简单示例 到目前为止,我们使用的所有例子都是基于连接形式的协议,例如TCP,在这个章节中,我们将会聚焦于无连接x形式的协议(User Datagram Protocol UDP),这个协议常常使用于对性能要求极其高但又可以允许少量的丢包的情况存在 我们先讲解一下UDP的概念,讲解一下它的特性和限制,接下来我们会描述一下这个章节示例应用的业务背景,这个示例将会很好的说明如何使用UDP协议的广播特性,我们也会利用解码器和编码器来处理一个POJO,在

Netty 系列九(支持UDP协议).

一.基础知识 UDP 协议相较于 TCP 协议的特点: 1.无连接协议,没有持久化连接:2.每个 UDP 数据报都是一个单独的传输单元:3.一定的数据报丢失:4.没有重传机制,也不管数据报是否可达:5.速度比TCP快很多,可用来高效处理大量数据 —— 牺牲了握手以及消息管理机制.6.常用于音频.视频场景,可以忍受一定的数据包丢失,追求速度上的提升. TCP 协议采用的是一种叫做单播的传输形式,UDP 协议提供了向多个接收者发送消息的额外传输形式(多播.广播): 单播(TCP 和 UDP):发送消

Netty实战(上)视频教程

Netty是一个异步事件驱动的网络应用程序框架, 用于快速开发可维护的高性能协议服务器和客户端. 在分享今天的视频教程之前,我们先介绍一些Netty. Netty是一个NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序.它极大地简化并简化了TCP和UDP套接字服务器等网络编程. “快速简便”并不意味着最终的应用程序会受到可维护性或性能问题的影响.Netty经过精心设计,具有丰富的协议,如FTP,SMTP,HTTP以及各种二进制和基于文本的传统协议.因此,Netty成功地找

[蓝牙] 2、蓝牙BLE协议及架构浅析&amp;&amp;基于广播超时待机说广播事件

第一章 BLE基本概念了解 一.蓝牙4.0和BLE区别 蓝牙4.0是一种应用非常广泛.基于2.4G射频的低功耗无线通讯技术.蓝牙低功耗(Bluetooth Low Energy ),人们又常称之为BlueTooth Smart,是由SIG( the Bluetooth Special Interest Group) 在2010年6月起草,在原有标准的蓝牙4.0核心协议上添加的一种低功耗技术. 蓝牙低功耗不等同于蓝牙4.0,只是蓝牙4.0的一个分支.蓝牙4.0是蓝牙3.0+ HS(高速蓝牙)规范的

下载-深入浅出Netty源码剖析、Netty实战高性能分布式RPC、NIO+Netty5各种RPC架构实战演练三部曲视频教程

下载-深入浅出Netty源码剖析.Netty实战高性能分布式RPC.NIO+Netty5各种RPC架构实战演练三部曲视频教程 第一部分:入浅出Netty源码剖析 第二部分:Netty实战高性能分布式RPC 第三部分:NIO+Netty5各种RPC架构实战演练

Netty实战高性能分布式RPC

Netty实战高性能分布式RPC 课程观看地址:http://www.xuetuwuyou.com/course/171 课程出自学途无忧网:http://www.xuetuwuyou.com 一.课程用到的软件 netty4.1.6.Final Spring Tool Suite 3.8.2.RELEASE Maven3.1 Spring4 Zookeeper3.4.6 JDK1.8.0_111 二.课程目标 1.快速学习netty的使用 2.自己学会构建高性能服务器 3.熟练使用多线程之间交

Udp广播的发送和接收(ios+AsyncUdpSocket)下篇

接上篇C#的Udp广播的发送和接收 http://www.cnblogs.com/JimmyBright/p/4637090.html ios中使用AsyncUdpSocket处理Udp的消息非常方便 准备工作: 在github上找到cocoaAsyncSocket,下载下来,把其中AsyncUdpSocket.h和AsyncUdpSocket.m文件copy到你的项目中,其他文件都不需要.copy到swift环境下会自动提示创建桥接文件,点击确认就可以了. 发送Udp: 上面代码对局域网广播消

Android 通过局域网udp广播自动建立socket连接

Android开发中经常会用到socket通讯.由于项目需要,最近研究了一下这方面的知识. 需求是想通过wifi实现android移动设备和android平台的电视之间的文件传输与控制. 毫无疑问这中间一定需要用到socket来进行通信.今天就两台设备的握手连接方式分享一下吧,该方法只是本人个人想法的实现,仅供参考,如有雷同,不胜荣幸. 要想使用socket进行通讯,就必须知道服务端的IP地址,我使用的是通过udp局网广播来实现局网内服务端的搜寻建立连接.以下是代码实现. 首先是客户端: pub