Netty 系列九(支持UDP协议).

一、基础知识

UDP 协议相较于 TCP 协议的特点:

1、无连接协议,没有持久化连接;
2、每个 UDP 数据报都是一个单独的传输单元;
3、一定的数据报丢失;
4、没有重传机制,也不管数据报是否可达;
5、速度比TCP快很多,可用来高效处理大量数据 —— 牺牲了握手以及消息管理机制。
6、常用于音频、视频场景,可以忍受一定的数据包丢失,追求速度上的提升。

TCP 协议采用的是一种叫做单播的传输形式,UDP 协议提供了向多个接收者发送消息的额外传输形式(多播、广播):

单播(TCP 和 UDP):发送消息给一个由唯一的地址所标识的单一的网络目的地。
多播(UDP):传输给一个预定义的主机组。
广播(UDP):传输到网络(或者子网)上的所有主机。

二、功能说明

广播方:打开一个文件,通过 UDP 使用特殊的受限广播地址或者零网络地址 255.255.255.255,把每一行作为一个消息广播到一个指定的端口。

接收方:通过 UDP 广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。所有的在该 UDP 端口上监听的事件监听器都将会接收到广播信息。

三、实现

下图展示了怎么将我们的 文件数据 广播为 UDP消息:所有的将要被传输的数据都被封装在了 LogEvent 消息中。 LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远程节点(监视器)所捕获。

Netty 中支持 UDP 协议主要通过以下相关类:

DatagramPacket:使用 ByteBuf 作为数据源,是 UDP 协议传输的消息容器。

DatagramChannel:扩展了 Netty 的 Channel 抽象以支持 UDP 的多播组管理,它的实现类 NioDatagramChannnel 用来和远程节点通信。

Bootstrap:UDP 协议的引导类,使用 bind() 方法绑定 Channel。

public class LogEvent {
    public static final byte SEPARATOR = ‘:‘;
    /**
     * IP套接字地址(IP地址+端口号)
     */
    private final InetSocketAddress inetSocketAddress;
    /**
     * 文件名
     */
    private final String logfile;
    /**
     * 消息内容
     */
    private final String msg;

    private final long received;

    /**
     * 用于传入消息的构造函数
     *
     * @param inetSocketAddress
     * @param logfile
     * @param msg
     * @param received
     */
    public LogEvent(InetSocketAddress inetSocketAddress, String logfile, String msg, long received) {
        this.inetSocketAddress = inetSocketAddress;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }

    /**
     * 用于传出消息的构造函数
     *
     * @param logfile
     * @param msg
     */
    public LogEvent(String logfile, String msg) {
        this(null, logfile, msg, -1);
    }

    public InetSocketAddress getInetSocketAddress() {
        return inetSocketAddress;
    }

    public String getLogfile() {
        return logfile;
    }

    public String getMsg() {
        return msg;
    }

    public long getReceived() {
        return received;
    }
}

文件实体类 LogEvent.java

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out) throws Exception {
        byte[] file = msg.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] content = msg.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf byteBuf = ctx.alloc().buffer(file.length + content.length + 1);
        byteBuf.writeBytes(file);
        byteBuf.writeByte(LogEvent.SEPARATOR);
        byteBuf.writeBytes(content);
        out.add(new DatagramPacket(byteBuf, remoteAddress));
    }
}

编码器 LogEventEncoder.java

该编码器实现了将 LogEvent 实体类内容转换为 DatagramPacket UDP数据报。

public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                //引导该 NioDatagramChannel(无连接的)
                .channel(NioDatagramChannel.class)
                // 设置 SO_BROADCAST 套接字选项
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }

    public void run() throws InterruptedException, IOException {
        //绑定 Channel,UDP 协议的连接用 bind() 方法
        Channel channel = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //长轮询 监听是否有新的日志文件生成
        while (true) {
            long length = file.length();
            if (length < pointer) {
                // 如果有必要,将文件指针设置到该文件的最后一个字节
                pointer = length;
            } else {
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                // 确保当前的文件指针,以确保没有任何的旧数据被发送
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    //对于每个日志条目,写入一个 LogEvent 到 Channel 中,最后加入一个换行符号
                    channel.writeAndFlush(new LogEvent(file.getAbsolutePath(), line + System.getProperty("line.separator")));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                // 休眠一秒,如果被中断,则退出循环,否则重新处理它
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                while (!Thread.interrupted()) {
                    break;
                }
            }
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        InetSocketAddress socketAddress = new InetSocketAddress("255.255.255.255", 8888);
        File file = new File("E:\\2018-09-12.log");
        LogEventBroadcaster logEventBroadcaster = new LogEventBroadcaster(socketAddress, file);
        try {
            logEventBroadcaster.run();
        } finally {
            logEventBroadcaster.stop();
        }
    }
}

现在,我们来测试一下这个 UDP 广播类,首先我们需要一个工具 nmap ,用它来监听 UDP 的 8888 端口,以接收我们广播的日志文件。下载地址: https://nmap.org/dist/nmap-7.70-win32.zip

下载完成后,命令行进入安装目录,执行命令:ncat.exe -l -u -p 8888 ,监听 UDP 端口。

当然,也可以自己写个测试类监听 UDP 端口,打印日志查看。这里我没有用 Netty 写监听类,直接用了 java 原生的 DatagramSocket 和 DatagramPacket 写的监听类,如下:

public class UDPServer {

    public static void main(String[] args) {
        DatagramSocket server = null;
        try {
            server = new DatagramSocket(8888);
            byte[] datas = new byte[1024];
            //用一个字节数组接收UDP包,字节数组在传递给构造函数时是空的
            while (true) {
                DatagramPacket datagramPacket = new DatagramPacket(datas, datas.length);
                server.receive(datagramPacket);
                System.out.println(new String(datas));
            }
        } catch (SocketException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            server.close();
        }
    }
}

UDPServer.java

基于 Netty 的监听类实现可以参考我上传 GitHub 上的源代码。

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/udp

原文地址:https://www.cnblogs.com/jmcui/p/9636505.html

时间: 2024-10-03 17:30:16

Netty 系列九(支持UDP协议).的相关文章

netty 3.9.2 UDP协议服务器和客户端DEMO

说明:基于netty 3.9.2的udp协议实现的(如果你使用的版本是4.X或5.X,请参考其他方法):程序的逻辑结构是,客户端发送给服务端一串数据,服务器端返回给客户端“A”.在进行游戏开发时需要对udp的丢包进行处理,可以利用服务器端的返回值进行相关处理,以确定是否重发,这方面具体没有实现. 文章结构: 一.服务器端 1.UDPServer 2.UdpChannelPipelineFactory 3.UDPServerHandler 二.客户端 1.UDPClient 2.UDPClient

协议系列之UDP协议

上节说的TCP协议虽然提供了可靠的传输,但是也有一个缺点,发送速度慢.那么有没有一种协议能快速发送的呢?这节要讨论的就是UDP协议,它提供了更加快的发送速度,但也牺牲了可靠性,它是一种无连接的传输协议.比起TCP,UDP更像是我们用手机发送短信,只管发送出去,但不能保证对方收到,不会建立连接,也没有确认环节. 图2-2-4-1为UDP协议报文结构.比起TCP,UDP报文的结构相对简单,只有源端口.目的端口.报文长度.校验和四个字段.其中源端口跟校验和是可选的,由于UDP不用接收端回复确认信息,所

Netty4.x中文教程系列(七)UDP协议

将近快一年时间没有更新Netty的博客.一方面原因是因为项目进度的问题.另外一方面是博主有一段时间去熟悉Unity3D引擎. 本章节主要记录博主自己Netty的UDP协议使用. 1. 构建UDP服务端 首先我们应该清楚UDP协议是一种无连接状态的协议.所以Netty框架区别于一般的有链接协议服务端启动程序(ServerBootstrap). Netty开发基于UDP协议的服务端需要使用Bootstrap 1 package dev.tinyz.game; 2 3 import io.netty.

Linux内核--网络栈实现分析(九)--传输层之UDP协议(下)

本文分析基于Linux Kernel 1.2.13 原创作品,转载请标明http://blog.csdn.net/yming0221/article/details/7549340 更多请查看专栏,地址http://blog.csdn.net/column/details/linux-kernel-net.html 作者:闫明 注:标题中的”(上)“,”(下)“表示分析过程基于数据包的传递方向:”(上)“表示分析是从底层向上分析.”(下)“表示分析是从上向下分析. 上篇分析了应用层经过BSD s

Netty系列之Netty高性能之道(转载InfoQ)

1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步阻塞IO)的通信框架,性能提升了8倍多. 事实上,我对这个数据并不感到惊讶,根据我5年多的NIO编程经验,通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是完全有可能的. 下面我们就一起来看下N

【读后感】Netty 系列之 Netty 高性能之道 - 相比 Mina 怎样 ?

太阳火神的漂亮人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的漂亮人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. [读后感] 不知道这是什么节奏,或许人家早就春意盎然了.仅仅是我方才感觉到而已! 研究 Mina 的过程中,偶然发现了 Netty,有人说 Min

【读后感】Netty系列之Netty高性能之道 - 相比 Mina 如何 ?

[读后感] 不知道这是什么节奏,也许人家早就春意盎然了,只是我方才感觉到而已! 研究 Mina 的过程中,偶然发现了 Netty,有人说 Mina 好久不更新了,而 Netty 一直很活跃, 这只能说, Netty 在高速的完善当中, 至于 Mina,是没有后劲儿了呢,还是已经很完善了,不需要再继续更新,这个到是不得而知, 至少目前看,还不错, 相比 Netty 的大数据.大并发.高效率...还有什么,请移步下文,自品其详吧! Netty系列之Netty高性能之道 1. 背景 1.1. 惊人的性

小编带你了解Netty 系列七(那些开箱即用的 ChannelHandler).

一.前言Netty 为许多通用协议提供了编×××和处理器,几乎可以开箱即用, 这减少了你在那些相当繁琐的事务上本来会花费的时间与精力.另外,这篇文章中,就不涉及 Netty 对 WebSocket协议 的支持了,因为涉及的篇幅有点大,会在下一篇文章做一个具体的介绍. 回到顶部二.SSL 协议SSL 协议是安全协议,层叠在其他协议之上.为了支持 SSL/TLS, Java 提供了 javax.net.ssl 包,它的 SSLContext 和 SSLEngine 类使得实现解密和加密相当简单直接.

5. 彤哥说netty系列之Java NIO核心组件之Channel

你好,我是彤哥,本篇是netty系列的第五篇. 简介 上一章我们一起学习了如何使用Java原生NIO实现群聊系统,这章我们一起来看看Java NIO的核心组件之一--Channel. 思维转变 首先,我想说的最重要的一个点是,学习NIO思维一定要从BIO那种一个连接一个线程的模式转变成多个连接(Channel)共用一个线程来处理的这种思维. 1个Connection = 1个Socket = 1个Channel,这几个概念可以看作是等价的,都表示一个连接,只不过是用在不同的场景中. 如果单从阻塞