网络编程 -- RPC实现原理 -- Netty -- 迭代版本V3 -- 编码解码

 网络编程 -- RPC实现原理 -- 目录

  啦啦啦

V2——Netty -- pipeline.addLast(io.netty.handler.codec.MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>) 覆写编码解码方法。

 pipeline相当于拦截器。在pipeline中添加MessageToMessageCodec接口的实现类,该接口的实现类中的encode()方法自动将发送的Object对象转换为ByteBuf,decode()方法自动将接收的ByteBuf对象转换为Object

 

  Class : Server

package lime.pri.limeNio.netty.netty03;

import java.net.InetSocketAddress;
import java.util.Date;
import java.util.List;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.CharsetUtil;
import lime.pri.limeNio.netty.netty03.entity.User;

public class Server {

    public static void main(String[] args) throws Exception {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        serverBootstrap.group(boss, worker);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new MessageToMessageCodec<ByteBuf, Object>() {
                    @Override
                    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
                        System.out.println("-- -- 服务端编码");
                        out.add(Unpooled.buffer().writeBytes(JSON.toJSONString(msg,SerializerFeature.WriteClassName).getBytes(CharsetUtil.UTF_8)));
                    }

                    @Override
                    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
                        System.out.println("-- -- 服务端解码");
                        out.add(JSON.parse(msg.toString(CharsetUtil.UTF_8)));
                    }
                }).addLast(new ChannelHandlerAdapter() {

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        System.out.println("客户端请求数据:" + msg);
                        String request = (String) msg;
                        Object response = "请求参数不正确";
                        if("Query Date".equalsIgnoreCase(request)){
                            response = "当前系统时间:" + new Date().toString();
                        }else if("Query User".equalsIgnoreCase(request)){
                            response = new User(1,"lime",new Date());
                        }
                        ChannelFuture channelFuture = ctx.writeAndFlush(response);
                        channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                        channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                        channelFuture.addListener(ChannelFutureListener.CLOSE);
                    }
                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        cause.printStackTrace();
                    }

                });
            }

        });
        ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(9999)).sync();
        channelFuture.channel().closeFuture().sync();
        boss.close();
        worker.close();
    }
}

  Class : Client

package lime.pri.limeNio.netty.netty03;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.CharsetUtil;

public class Client {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            new Thread() {
                {
                    setDaemon(false);
                }

                public void run() {
                    try {
                        client();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                };
            }.start();
            Thread.sleep(1000);
        }
    }

    private static void client() throws Exception {
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup worker = new NioEventLoopGroup();
        bootstrap.group(worker);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new MessageToMessageCodec<ByteBuf, Object>() {

                    @Override
                    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
                        System.out.println("-- -- 客户端编码");
                        out.add(Unpooled.buffer().writeBytes(JSON.toJSONString(msg, SerializerFeature.WriteClassName).getBytes(CharsetUtil.UTF_8)));
                    }

                    @Override
                    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
                        System.out.println("-- -- 客户端解码");
                        out.add(JSON.parse(msg.toString(CharsetUtil.UTF_8)));
                    }
                }).addLast(new ChannelHandlerAdapter() {
                    /**
                     * 默认只捕获网络连接异常
                     */
                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        System.out.println(cause);
                    }

                    /**
                     * 客户端发送经过JSON编码的byteBuf
                     */
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        String request = null;
                        switch ((int) (Math.random() * 10) % 3) {
                        case 0:
                            request = "Query Date";
                            break;
                        case 1:
                            request = "Query User";
                            break;

                        default:
                            request = "Query What?";
                            break;
                        }
                        ChannelFuture channelFuture = ctx.writeAndFlush(request);
                        channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                        channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    }

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        System.out.println("服务端响应数据 --> " + msg);
                    }

                });
            }
        });
        ChannelFuture channelFuture;

        channelFuture = bootstrap.connect(new InetSocketAddress(9999)).sync();
        channelFuture.channel().closeFuture().sync();
        worker.close();
    }
}

啦啦啦

时间: 2024-12-12 17:03:29

网络编程 -- RPC实现原理 -- Netty -- 迭代版本V3 -- 编码解码的相关文章

网络编程 -- RPC实现原理 -- Netty -- 迭代版本V4 -- 粘包拆包

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V2--Netty -- new LengthFieldPrepender(2) : 设置数据包 2 字节的特征码 new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2) :  65535 :数据包长度.0:分隔符偏移值.2:分隔符长度.0:数据包偏移值.2:数据包长度. Class : Server package lime.pri.limeNio.netty.netty04; import j

网络编程 -- RPC实现原理 -- Netty -- 迭代版本V2 -- 对象传输

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V2--Netty -- 使用序列化和反序列化在网络上传输对象 只能传输( ByteBuf, FileRegion )两种类型,因此必须将对象在发送之前进行序列化,放进ByteBuf中,客户端接收到ByteBuf时,将字节码取出,反序列化成对象. Class : Server package lime.pri.limeNio.netty.netty02.exercise; import java.net.InetSocketAddress; i

网络编程 -- RPC实现原理 -- RPC -- 迭代版本V1 -- 本地方法调用

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V2--RPC -- 本地方法调用:不通过网络 入门 1. RPCObjectProxy rpcObjectProxy = new RPCObjectProxy(new LocalRPCClient()); : 绑定目标对象 2. IUserService userService = (IUserService) rpcObjectProxy.create(IUserService.class); :返回代理类 3. List<User> u

网络编程 -- RPC实现原理 -- RPC -- 迭代版本V2 -- 本地方法调用 整合 Spring

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V2--RPC -- 本地方法调用 + Spring 1. 配置applicationContext.xml文件 注入 bean 及 管理 bean 之间的依赖关系 2. RPCObjectProxy 类 实现 FactoryBean<Object> 接口,通过 public Object getObject() throws Exception 返回代理类 3. List<User> users = userService.qu

网络编程 -- RPC实现原理 -- RPC -- 迭代版本V3 -- 远程方法调用 整合 Spring

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V3--RPC -- 远程方法调用 及 null的传输 + Spring 服务提供商: 1. 配置 rpc03_server.xml 注入 服务提供商 rpcServiceProvider并指定初始化方法 及 服务实例 IUserService 2. 读取 服务消费者 请求的 MethodStaics ,通过反射获取服务端实例方法的返回值.返回值为null值,则映射为NullWritable实例返回.不为null,则不加以约束. 服务代理商:

网络编程 -- RPC实现原理 -- NIO多线程 -- 迭代版本V2

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V2--增加WriteQueue队列,存放selectionKey.addWriteEventToQueue()添加selectionKey并唤醒阻塞的selector.等selector唤醒之后再注册OP_WRITE事件. ( selectionKey.cancel();清除key对应事件之后,由于多线程 main线程和对应的IO线程会抢夺selector资源. 在selector.select()和sc.register(selection

网络编程 -- RPC实现原理 -- NIO多线程 -- 迭代版本V1

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V1--设置标识变量selectionKey.attach(true);只处理一次(会一直循环遍历selectionKeys,占用CPU资源). (由于key没有清除,依旧在selectionKeys中存在,遍历时依旧会检测到对应事件,除非socket关闭或调用selectionKey.cancel();清除对应事件) Class : Service package lime.pri.limeNio.optimize.socket2; impo

Python网络编程04/recv原理/高大上版解决粘包方式

目录 Python网络编程04/recv原理/高大上版解决粘包方式 1.昨日内容回顾 2.recv工作原理 3.高大上版解决粘包方式(自定制报头) 3.1 解决思路: 3.2 服务端 3.3客户端 4.基于UDP协议的socket通信 4.1服务端 4.2客户端 Python网络编程04/recv原理/高大上版解决粘包方式 1.昨日内容回顾 1. 通信循环 2. 链接通信循环 3. 远程执行命令: subprocess.Popen() # bytes: 网络传输, 文件存储时. 4. 粘包现象

Netty 粘包 &amp; 拆包 &amp; 编码 &amp; 解码 &amp; 序列化 介绍

目录: 粘包 & 拆包及解决方案 ByteToMessageDecoder 基于长度编解码器 基于分割符的编解码器 google 的 Protobuf 序列化介绍 其他的 前言 Netty 作为一个网络框架,对 TCP 连接中的问题都做了全面的考虑,比如粘包拆包导致的半包问题,如何编解码,如何实现私有协议,序列化等等.本文主要针对这些问题做一个简单介绍,目的是想对整个 Netty 的编解码框架做一个全盘的审视,以确保在后面的源码学习中不会一叶障目不见泰山. 1. 粘包 & 拆包及解决方案