Netty 5 自定义协议 教程

网上好多连接好多demo都不是netty5的,都是以前的版本,况且还有好多运行时老报错。入门级demo就不写了,估计都是那些老套路。好多公司都会有最佳实践,今天就说说如何自定义协议,一般自定义协议都是公司内部各个部门定义的,当然了我写的比较简单。

注意:

本教程是采用netty-all-5.0.0.Alpha2.jar,netty5的版本,不是网上很多的例子都是采用以前老大版本

自定义协议:

协议 {

  协议头(header)

  消息体(body)

}


header格式 {

  固定头,

  命令码,

  版本号,

  长度

}


自定义协议嘛,都是可以自己定义实现的,内部商量好就可以

package com.nio.netty;

import java.io.Serializable;
import java.util.Arrays;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgHeader implements Serializable{
    //固定头
    private byte startTag;

    //命令码,4位
    private byte[] cmdCode;

    //版本 2位
    private byte[] version;

    private int length;

    public byte[] getVersion() {
        return version;
    }

    public void setVersion(byte[] version) {
        this.version = version;
    }

    public byte[] getCmdCode() {
        return cmdCode;
    }

    public void setCmdCode(byte[] cmdCode) {
        this.cmdCode = cmdCode;
    }

    public byte getStartTag() {
        return startTag;
    }

    public void setStartTag(byte startTag) {
        this.startTag = startTag;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    @Override
    public String toString() {
        return "MsgHeader{" +
                "startTag=" + startTag +
                ", cmdCode=" + Arrays.toString(cmdCode) +
                ", version=" + Arrays.toString(version) +
                ", length=" + length +
                ‘}‘;
    }
}
package com.nio.netty;

/**
 * Created by sdc on 2017/8/26.
 */
public class Message {
    private MsgHeader header;
    private Object body;

    //检验和
//    private byte crcCode;

//    public byte getCrcCode() {
//        return crcCode;
//    }
//
//    public void setCrcCode(byte crcCode) {
//        this.crcCode = crcCode;
//    }

    public MsgHeader getHeader() {
        return header;
    }

    public void setHeader(MsgHeader header) {
        this.header = header;
    }

    public Object getBody() {
        return body;
    }

    public void setBody(Object body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return "Message{" +
                "header=" + header +
                ", body=" + body +
//                ", crcCode=" + crcCode +
                ‘}‘;
    }
}

消息格式定义完成后,需要编码和解码,这里采用ByteToMessageDecoder MessageToByteEncoder,这两个编解码。

package com.nio.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgEncoder extends MessageToByteEncoder {

    public static byte getIndexToByte(int i, int index){
        if(index == 0){
            return (byte)(i % 10);
        }else{
            int num = (int)Math.pow(10, index);
            return (byte)((i / num) % 10);
        }
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf out) throws Exception {
        if (o instanceof Message) {
            try {
                Message msg = (Message)o;
                if (msg == null || msg.getHeader() == null) {
                    throw new Exception("The encode message is null");
                }

                out.writeByte(msg.getHeader().getStartTag());
                out.writeBytes(msg.getHeader().getCmdCode());

                //占位
                byte[] lengthBytes = new byte[]{0, 0, 0, 0};
                out.writeBytes(lengthBytes);

                out.writeBytes(msg.getHeader().getVersion());
                String body = (String) msg.getBody();
                int length = 0;
                if (body != null) {
                    byte[] bodyBytes = body.getBytes();
                    out.writeBytes(bodyBytes);
                    length = bodyBytes.length;

//                    if (Constants.CRCCODE_DEFAULT != msg.getCrcCode()) {
//                        msg.setCrcCode(CRC8.calcCrc8(bodyBytes));
//                    }
//                    msg.setCrcCode();
                }

                //长度从int转换为byte[4]
                byte l1 = getIndexToByte(length, 3);
                byte l2 = getIndexToByte(length, 2);
                byte l3 = getIndexToByte(length, 1);
                byte l4 = getIndexToByte(length, 0);
                lengthBytes = new byte[]{l1, l2, l3, l4};
                out.setBytes(5, lengthBytes);
                System.out.println("encoder:" + msg.getBody());
//                out.writeByte(msg.getCrcCode());
            }catch(Exception e){
                e.printStackTrace();
                throw e;
            }
        }
    }
}
package com.nio.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        try{
            //内部协议这块可以约定一下,超过多大的长度就不可以了。
//            if(in.readableBytes() < 12){
//                return;
//            }
            System.out.println("开始解码消息,消息长度:" + in.readableBytes());

            in.markReaderIndex();
            //设置一些消息的属性
            Message message = new Message();
            MsgHeader header = new MsgHeader();
            header.setStartTag(in.readByte());

            byte[] cmdCode = new byte[4];
            in.readBytes(cmdCode);
            header.setCmdCode(cmdCode);
            System.out.println(new String(cmdCode, "UTF-8"));

            //长度从byte[4]转int
            byte[] lengthBytes = new byte[4];
            in.readBytes(lengthBytes);
            int length = toInt(lengthBytes);
            System.out.println("header:" + length);
            header.setLength(length);
            if(length < 0 || length > 10240){//过长消息或不合法消息
                throw new IllegalArgumentException("wrong message length");
            }

            byte[] version = new byte[2];
            in.readBytes(version);
            header.setVersion(version);
            System.out.println("version:" + new String(version, "UTF-8"));

            if(header.getLength() > 0){
                System.out.println("bytebuffer可读的范围" + in.readableBytes());
                if(in.readableBytes() > length + 1){
                    in.resetReaderIndex();
                    System.out.println("返回了");
                    return;
                }
                //读取body里的内容
                byte[] bodyBytes = new byte[header.getLength()];
                in.readBytes(bodyBytes);
                message.setBody(new String(bodyBytes, "UTF-8"));
            }
            //crccode暂时去掉
//            message.setCrcCode(in.readByte());

            //设置头部
            message.setHeader(header);
            System.out.println("body:" + message.getBody());

            out.add(message);
        }catch(Exception e){
            e.printStackTrace();
            throw e;
        }
    }

    public static int toInt(byte[] bytes){
        int value = 0;
        for(int i=0; i<bytes.length; i++){
            int num = (int)Math.pow(10, bytes.length - 1 - i);
            value += num * bytes[i];
        }
        return value;
    }
}

消息的类已经处理完了,下面来开始写客户端和服务端的代码。

客户端代码:

package com.nio.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgClient {

    public void connect(String ip, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        Message message = new Message();
        String msgStr = "我想发送一条消息";
        MsgHeader header = new MsgHeader();
        header.setStartTag(new Byte("0"));
        header.setCmdCode("1234".getBytes());
        header.setLength(msgStr.length());
        header.setVersion("11".getBytes());

        message.setBody(msgStr);
        message.setHeader(header);
        try {
            Bootstrap bs = new Bootstrap();
            bs.group(workerGroup).channel(NioSocketChannel.class).handler(new ChildChannelHandler(message));
            ChannelFuture f = bs.connect(ip,port).sync();

            //写入消息
            f.channel().writeAndFlush(message).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {

        Message message;

        public ChildChannelHandler(Message message) {
            this.message = message;
        }

        @Override
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(new MsgDecoder())
                    .addLast(new MsgEncoder());
        }
    }

    public static void main(String[] args){
        try {
            new MsgClient().connect("127.0.0.1", 9080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

服务端代码

package com.nio.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            ChannelFuture cf = sb.bind(port).sync();

            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {

        @Override
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline()
                    .addLast(new MsgDecoder())
                    .addLast(new MsgEncoder());
        }
    }

    public static void main(String[] args){
        try {
            new MsgServer().bind(9080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行的时候先运行服务端,否则胡报如下的错误,

-all-5.0.0.Alpha2.jar com.nio.netty.MsgClient
java.net.ConnectException: Connection refused: no further information: /127.0.0.1:9080
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:223)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:276)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:531)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
	at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
	at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
	at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
	at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
	at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
	at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)

Process finished with exit code 0

成功会打印以下信息:

开始解码消息,消息长度:35
开始解码消息,消息长度2222:34
1234
header:24
version:11
可读的范围24
body:我想发送一条消息

Process finished with exit code 1

基本流程已经走完,学框架,我还是习惯先写例子,然后再学概念性的东西,会理解的更深刻一些。

参考了这篇文章,但是这篇文章似乎看不了结果,所以还是自己改了改,写了点东西。

http://www.blogs8.cn/posts/Wx8Sd43


时间: 2024-10-07 04:17:36

Netty 5 自定义协议 教程的相关文章

Netty自定义协议解析原理与应用

目前,大家都选择Netty做为游戏服务器框架网络通信的框架,而且目前也有很多优秀的产品是基于Netty开发的.它的稳定性,易用性和高效率性已得到广泛的认同.在游戏服务器开发中,选择netty一般就意味着我们要使用长连接来建立与客户端的通信,并且是自定义协议,在网络开发中,我们不得不处理断包,粘包的问题,因为Tcp/ip是基于数据流的传输,包与包之间没有明确的界限,而且于由网络路由的复杂性,大包有可能分成小包,小包也有可能被组装成大包进行传输.而Netty就考虑到了这一点,而且它用一个类就帮我们处

swoole入门教程05-Swoole的自定义协议功能的使用

环境说明: 系统:Ubuntu14.04 (安装教程包括CentOS6.5) PHP版本:PHP-5.5.10 swoole版本:1.7.8-alpha 1.为什么要提供自定义协议 熟悉TCP通信的朋友都会知道,TCP是一个流式协议.客户端向服务器发送的一段数据,可能并不会被服务器一次就完整的收到;客户端向服务器发送的多段数据,可能服务器一次就收到了全部的数据.而实际应用中,我们希望在服务器端能一次接收一段完整的数据,不多也不少.传统的TCP服务器中,往往需要由程序员维护一个缓存区,先将读到的数

物联网架构成长之路(35)-利用Netty解析物联网自定义协议

一.前言 前面博客大部分介绍了基于EMQ中间件,通信协议使用的是MQTT,而传输的数据为纯文本数据,采用JSON格式.这种方式,大部分一看就知道是熟悉Web开发.软件开发的人喜欢用的方式.由于我也是做web软件开发的,也是比较喜欢这种方式.阿里的物联网平台,也是推荐这种方式.但是,但是做惯硬件开发,嵌入式开发就比较喜欢用裸TCP-Socket连接.采用的是二进制协议.基于此大部分应用场合为了兼容旧设备,就需要单独开发一个TCP服务器的网关.这里使用以前学过的,也是比较流行的Netty框架. 话不

Mina自定义协议简单实现

因公司需要做个电子秤自动称重系统,需要自定义协议实现,所以就用Mina简单实现了一下,因为比较简单,就直接上代码.有时间的话改成Netty版 服务端 package net.heartma.server; import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import

第13章 TCP编程(3)_基于自定义协议的多进程模型

5. 自定义协议编程 (1)自定义协议:MSG //自定义的协议(TLV:Type length Value) typedef struct{ //协议头部 char head[10];//TLV中的T unsigned int checkNum; //校验码 unsigned int cbSizeContent; //协议体的长度 //协议体部 char buff[512]; //数据 }MSG; (2)自定义读写函数 ①extern int write_msg(int sockfd, cha

Google Chrome 自定义协议(PROTOCOL)问题的处理

最近在使用谷歌浏览器的时候遇到了自定义协议(PROTOCOL)的问题,比较折腾,特此记录,希望我浪费生命换来的结果能够帮助读到此文的朋友少浪费一点宝贵的时间! 由于某些原因,电脑里一直没有安装阿里旺旺,今天由于买一个东西需要和卖家沟通,就安装了一下.结果发现在网页上点击旺旺小图标不会打开旺旺软件,折腾正式开始! 由于打开了很多网页,也没立即重启Chrome测试,好半天终于把所有的网页都处理掉了,关掉Chrome再打开,还是不行! 重启电脑,再打开,还是不行!然后用IE测试了一下发现是正常的,坚决

第13章 TCP编程(4)_基于自定义协议的多线程模型

7. 基于自定义协议的多线程模型 (1)服务端编程 ①主线程负责调用accept与客户端连接 ②当接受客户端连接后,创建子线程来服务客户端,以处理多客户端的并发访问. ③服务端接到的客户端信息后,回显给客户端 (2)客户端编程 ①从键盘输入信息,并发送给服务端 ②接收来自服务端的信息 //msg.h与前一节相同 #ifndef __MSG_H__ #define __MSG_H__ #include <sys/types.h> //求结构体中成员变量的偏移地址 #define OFFSET(T

自定义协议的编码解码

2015.4.1 wqchen. 转载请注明出处 http://www.cnblogs.com/wqchen/p/4385798.html 本文介绍的是一个自定义协议的编码解码工具的实现. 游戏开发中,前端后端协议一般都会协商定制通信协议的格式,统一格式后用程序脚本对应前端和后端的编程语言,分别生成一份协议的编码和解码方案,便于协议的一致性. 这样的工具有很多,比较出名的是google的protobuf,它可以支持很多种编程语言.我也曾试用过protobuf,看过一点它的实现,protobuf完

android4.2串口jni收发数据(基于自定义协议)

代码已经验证过,没问题 ! 代码层次结构: |-----serial_communication_class--- |                     |-------src--------- |                                  |------com------ |                                             |----object----- |