Dubbo中编码和解码的解析

(这里做的解析不是很详细,等到走完整个流程再来解析)Dubbo中编解码的工作由Codec2接口的实现来处理,回想一下第一次接触到Codec2相关的内容是在服务端暴露服务的时候,根据具体的协议去暴露服务的步骤中,在DubboProtocol的createServer方法中:

1234567891011
private ExchangeServer createServer(URL url) {	。。。    //这里url会添加codec=dubbo    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);    ExchangeServer server;    try {        server = Exchangers.bind(url, requestHandler);    }    。。。    return server;}

紧接着进入Exchangers.bind(url, requestHandler);

12345
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {	//如果url中没有codec属性,就会添加codec=exchange    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");    return getExchanger(url).bind(url, handler);}

然后会继续进入HeaderExchanger的bind方法:

123
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}

在这里会创建一个DecodeHandler实例。继续跟踪Transporters的bind方法,会发现直接返回一个NettyServer实例,在NettyServer的父类AbstractEndpoint构造方法初始的时候,会根据url获取一个ChannelCodec,并将其赋值给codec存放到NettyServer的实例中。

我们先看下getChannelCodec(url);方法:

1234567891011121314
protected static Codec2 getChannelCodec(URL url) {	//获取codecName,不存在的话,默认为telnet    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");    //先看下是不是Codec2的实现,是的话就根据SPI扩展机制获得Codec2扩展的实现    //我们这里默认使用的是DubboCountCodec    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);    } else {    	//如果不是Codec2的实现,就去查找Codec的实现        //然后使用CodecAdapter适配器类来转换成Codec2        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)                                           .getExtension(codecName));    }}

这里返回的是Codec2,而Codec这个接口已经被标记为过时。到这里的话,在NettyServer中就会存在一个Codec2的实例了。

在继续往下看到NettyServer中的doOpen()方法,这里是使用Netty的逻辑打开服务并绑定监听服务的地方:

123456789101112131415161718192021222324
protected void doOpen() throws Throwable {    NettyHelper.setNettyLoggerFactory();    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));    bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);    channels = nettyHandler.getChannels();    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {        public ChannelPipeline getPipeline() {        	//这里的getCodec方法获取到的codec就是在AbstractEndpoint中我们获取到的codec            //NettyCodecAdapter,适配器类            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);            ChannelPipeline pipeline = Channels.pipeline();            pipeline.addLast("decoder", adapter.getDecoder());//SimpleChannelUpstreamHandler            pipeline.addLast("encoder", adapter.getEncoder());//OneToOneEncoder            pipeline.addLast("handler", nettyHandler);            return pipeline;        }    });    // bind    channel = bootstrap.bind(getBindAddress());}

这里就在Netty的pipeline中添加了编解码器。这里涉及到Netty的相关流程,可以先了解下Netty3服务端流程简介

decoder为解码器,是一个SimpleChannelUpstreamHandler,从Socket到Netty中的时候,需要解码,也就是服务提供端接收到消费者的请求的时候,需要解码。

encoder是编码器,是OneToOneEncoder,这个类实现了ChannelDownstreamHandler,从服务提供端发送给服务消费者的时候,需要编码。

nettyHandler实现了ChannelUpstreamHandler, ChannelDownstreamHandler两个,上下的时候都需要处理。

接收到服务消费者的请求的时候,会先执行decoder,然后执行nettyHandler。

发送给消费者的时候,会先执行nettyHandler,然后执行encoder。

dubbo协议头

协议头是16字节的定长数据:

  • 2字节short类型的Magic
  • 1字节的消息标志位
    • 5位序列化id
    • 1位心跳还是正常请求
    • 1位双向还是单向
    • 1位请求还是响应
  • 1字节的状态位
  • 8字节的消息id
  • 4字节数据长度

编码的过程

首先会判断是请求还是响应,代码在ExchangeCodec的encode方法:

123456789
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {    if (msg instanceof Request) {//Request类型        encodeRequest(channel, buffer, (Request) msg);    } else if (msg instanceof Response) {//Response类型        encodeResponse(channel, buffer, (Response) msg);    } else {//telenet类型的        super.encode(channel, buffer, msg);    }}

服务提供者对响应信息编码

在服务提供者端一般是对响应来做编码,所以这里重点看下encodeResponse。

encodeResponse:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {    try {    	//序列化方式        //也是根据SPI扩展来获取,url中没指定的话默认使用hessian2        Serialization serialization = getSerialization(channel);        //长度为16字节的数组,协议头        byte[] header = new byte[HEADER_LENGTH];        //魔数0xdabb        Bytes.short2bytes(MAGIC, header);        //序列化方式        header[2] = serialization.getContentTypeId();        //心跳消息还是正常消息        if (res.isHeartbeat()) header[2] |= FLAG_EVENT;        //响应状态        byte status = res.getStatus();        header[3] = status;        //设置请求id        Bytes.long2bytes(res.getId(), header, 4);		//buffer为1024字节的ChannelBuffer        //获取buffer的写入位置        int savedWriteIndex = buffer.writerIndex();        //需要再加上协议头的长度之后,才是正确的写入位置        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);        // 对响应信息或者错误消息进行编码        if (status == Response.OK) {            if (res.isHeartbeat()) {            	//心跳                encodeHeartbeatData(channel, out, res.getResult());            } else {            	//正常响应                encodeResponseData(channel, out, res.getResult());            }        }        //错误消息        else out.writeUTF(res.getErrorMessage());        out.flushBuffer();        bos.flush();        bos.close();		//写出去的消息的长度        int len = bos.writtenBytes();        //查看消息长度是否过长        checkPayload(channel, len);        Bytes.int2bytes(len, header, 12);        //重置写入的位置        buffer.writerIndex(savedWriteIndex);        //向buffer中写入消息头        buffer.writeBytes(header); // write header.        //buffer写出去的位置从writerIndex开始,加上header长度,加上数据长度        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);    } catch (Throwable t) {        // 发送失败信息给Consumer,否则Consumer只能等超时了        if (! res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {            try {                // FIXME 在Codec中打印出错日志?在IoHanndler的caught中统一处理?                logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);

Response r = new Response(res.getId(), res.getVersion());                r.setStatus(Response.BAD_RESPONSE);                r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));                channel.send(r);

return;            } catch (RemotingException e) {                logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);            }        }

// 重新抛出收到的异常        if (t instanceof IOException) {            throw (IOException) t;        } else if (t instanceof RuntimeException) {            throw (RuntimeException) t;        } else if (t instanceof Error) {            throw (Error) t;        } else  {            throw new RuntimeException(t.getMessage(), t);        }    }}

服务消费者对请求信息编码

消费者端暂先不做解析

解码的过程

服务提供者对请求消息的解码

decode方法一次只会解析一个完整的dubbo协议包,但是每次收到的协议包不一定是完整的,或者有可能是多个协议包。看下代码解析,首先看NettyCodecAdapter的内部类InternalDecoder的messageReceived方法:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {    Object o = event.getMessage();    if (! (o instanceof ChannelBuffer)) {        ctx.sendUpstream(event);        return;    }

ChannelBuffer input = (ChannelBuffer) o;    int readable = input.readableBytes();    if (readable <= 0) {        return;    }

com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;    if (buffer.readable()) {        if (buffer instanceof DynamicChannelBuffer) {            buffer.writeBytes(input.toByteBuffer());            message = buffer;        } else {            int size = buffer.readableBytes() + input.readableBytes();            message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(                size > bufferSize ? size : bufferSize);            message.writeBytes(buffer, buffer.readableBytes());            message.writeBytes(input.toByteBuffer());        }    } else {        message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(            input.toByteBuffer());    }

NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);    Object msg;    //读索引    int saveReaderIndex;    try {        do {            saveReaderIndex = message.readerIndex();            try {            //解码                msg = codec.decode(channel, message);            } catch (IOException e) {                buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;                throw e;            }            //不完整的协议包            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {            	//重置读索引                message.readerIndex(saveReaderIndex);                //跳出循环,之后在finally中把message赋值给buffer保存起来,等到下次接收到数据包的时候会追加到buffer的后面                break;            } else {//有多个协议包,触发messageReceived事件                if (saveReaderIndex == message.readerIndex()) {                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;                    throw new IOException("Decode without read data.");                }                if (msg != null) {                    Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());                }            }        } while (message.readable());    } finally {        if (message.readable()) {            message.discardReadBytes();            buffer = message;        } else {            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;        }        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());    }}

继续看codec.decode(channel, message);这里是DubboCountCodec的decode方法:

1234567891011121314151617181920212223242526
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {	//当前的读索引记录下来    int save = buffer.readerIndex();    //多消息    MultiMessage result = MultiMessage.create();    do {    	//解码消息        Object obj = codec.decode(channel, buffer);        //不是完整的协议包        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {            buffer.readerIndex(save);            break;        } else {//多个协议包            result.addMessage(obj);            logMessageLength(obj, buffer.readerIndex() - save);            save = buffer.readerIndex();        }    } while (true);    if (result.isEmpty()) {        return Codec2.DecodeResult.NEED_MORE_INPUT;    }    if (result.size() == 1) {        return result.get(0);    }    return result;}

继续看ExchangeCodec的decode方法:

123456789
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {	//可读字节数    int readable = buffer.readableBytes();    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];    //协议头    buffer.readBytes(header);    //解码    return decode(channel, buffer, readable, header);}

解码decode:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {    //检查魔数.    if (readable > 0 && header[0] != MAGIC_HIGH             || readable > 1 && header[1] != MAGIC_LOW) {        int length = header.length;        if (header.length < readable) {            header = Bytes.copyOf(header, readable);            buffer.readBytes(header, length, readable - length);        }        for (int i = 1; i < header.length - 1; i ++) {            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {                buffer.readerIndex(buffer.readerIndex() - header.length + i);                header = Bytes.copyOf(header, i);                break;            }        }        //telenet        return super.decode(channel, buffer, readable, header);    }    //不完整的包    if (readable < HEADER_LENGTH) {        return DecodeResult.NEED_MORE_INPUT;    }

//数据长度    int len = Bytes.bytes2int(header, 12);    checkPayload(channel, len);

int tt = len + HEADER_LENGTH;    if( readable < tt ) {        return DecodeResult.NEED_MORE_INPUT;    }

// limit input stream.    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {    	//解码数据        return decodeBody(channel, is, header);    } finally {        if (is.available() > 0) {            try {                StreamUtils.skipUnusedStream(is);            } catch (IOException e) { }        }    }}

decodeBody解析数据部分:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);    //获取序列化方式    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);    //反序列化    ObjectInput in = s.deserialize(channel.getUrl(), is);    //获取请求id    long id = Bytes.bytes2long(header, 4);    //这里是解码响应数据    if ((flag & FLAG_REQUEST) == 0) {        //response的id设为来时候的Request的id,这样才能对上暗号        Response res = new Response(id);        //判断是什么类型请求        if ((flag & FLAG_EVENT) != 0) {            res.setEvent(Response.HEARTBEAT_EVENT);        }        //获取状态        byte status = header[3];        res.setStatus(status);        if (status == Response.OK) {            try {                Object data;                if (res.isHeartbeat()) {                	//解码心跳数据                    data = decodeHeartbeatData(channel, in);                } else if (res.isEvent()) {                	//事件                    data = decodeEventData(channel, in);                } else {                	//响应                    data = decodeResponseData(channel, in, getRequestData(id));                }                res.setResult(data);            } catch (Throwable t) {                res.setStatus(Response.CLIENT_ERROR);                res.setErrorMessage(StringUtils.toString(t));            }        } else {            res.setErrorMessage(in.readUTF());        }        return res;    } else {//这是解码请求数据        // request的id        Request req = new Request(id);        req.setVersion("2.0.0");        req.setTwoWay((flag & FLAG_TWOWAY) != 0);        if ((flag & FLAG_EVENT) != 0) {            req.setEvent(Request.HEARTBEAT_EVENT);        }        try {            Object data;            if (req.isHeartbeat()) {            	//心跳                data = decodeHeartbeatData(channel, in);            } else if (req.isEvent()) {            	//事件                data = decodeEventData(channel, in);            } else {            	//请求                data = decodeRequestData(channel, in);            }            req.setData(data);        } catch (Throwable t) {            // bad request            req.setBroken(true);            req.setData(t);        }        return req;    }}

具体的解码细节交给底层解码器,这里是使用的hessian2。

服务消费者对响应消息的解码

暂先不做解释。

时间: 2024-08-04 12:38:41

Dubbo中编码和解码的解析的相关文章

Dubbo中暴露服务的过程解析

dubbo暴露服务有两种情况,一种是设置了延迟暴露(比如delay="5000"),另外一种是没有设置延迟暴露或者延迟设置为-1(delay="-1"): 设置了延迟暴露,dubbo在Spring实例化bean(initializeBean)的时候会对实现了InitializingBean的类进行回调,回调方法是afterPropertySet(),如果设置了延迟暴露,dubbo在这个方法中进行服务的发布. 没有设置延迟或者延迟为-1,dubbo会在Spring实例

Dubbo中消费者初始化的过程解析

首先还是Spring碰到dubbo的标签之后,会使用parseCustomElement解析dubbo标签,使用的解析器是dubbo的DubboBeanDefinitionParser,解析完成之后返回BeanDefinition给Spring管理. 服务消费者端对应的是ReferenceBean,实现了ApplicationContextAware接口,Spring会在Bean的实例化那一步回调setApplicationContext方法.也实现了InitializingBean接口,接着会

python3中编码和解码

------------恢复内容开始------------ 一.python3与python2.7在字符编码上的差异 python2.7 默认编码是和所在的操作系统一致,windows 是 GBK,macOS是 utf8 ,linux是utf8 python3 编码统一规定默认编码为 Unicode 二.python3中str和bytes的区别 (1)python3有两种数据类型:str 和bytes. 1.编码为Unicode的文本显示为str: 2.其他编码(比如gbk,utf8等)显示为

python中编码和解码decode和encode的使用

python 在处理字符串时经常遇到编码错误,导致乱码,且python 2.x和 python 3.x之间有很大的不同,先在这里记录一下,以后整理; 转载 文章一篇: http://www.cnblogs.com/evening/archive/2012/04/19/2457440.html

Javascript编码与解码函数

Javascript编码与解码 在javascript中编码和解码主要有6个函数,这几个函数在解决前台传中文字符给后台,以及解决乱码问题很有帮助.下面一一介绍.(为了不以讹传讹请自己验证) 1.编码函数:escape(),对应的解码函数:unescape(),基本不用了. escape(charString),charstring 参数是要编码的任意 String 对象或文字. 说明 escape 方法返回一个包含了 charstring 内容的字符串值(Unicode 格式). escape不

java、js的编码、解码

如果在地址栏挂载参数,特别是包含中文,往往要进行编码,取值时再解码,以下是java和js中编码.解码的各自方法. java: @Test public void test3() throws UnsupportedEncodingException{ System.out.println(URLEncoder.encode("我", "UTF-8"));//%E6%88%91 System.out.println(URLDecoder.decode("%E

了解URL编码的基本概念,在javascript和java程序中使用内置的API进行编码和解码

1.URL编码的基本概念 URL只能使用US-ASCII 字符集来通过因特网进行发送.由于URL常常会包含 ASCII 集合之外的字符,URL必须转换为有效的 ASCII 格式.URL 编码使用 "%" 其后跟随两位的十六进制数来替换非 ASCII 字符.URL 不能包含空格,URL 编码通常使用 + 来替换空格.所谓URL编码,就是将非US-ASCII字符和US-ASCII中的特殊字符,用相应的字符集编码来表示.比如,汉字"你",如果用UTF-8编码,出现在URL

http协议中的编码和解码

http://www.csdn1 2 3.com/html/itweb/20130730/29422_29378_29408.htm ****************************** 一.字符集与文字编码简介 1. 计算机如何显示文字 我们知道,计算机是以二进制的“形式”来保存和处理数据的,也 就是说,不管我们使用键盘进行输入,还是让计算机去读取一个文本文件,计算机得到的原始内容是一些二进制序列,当需要对这些二进制序列进行显示时,计算机 会依照某种“翻译机制”(也就是编码方式),取到

SNMP++ 02-SNMP中INTEGER的BER编码与解码

阅读完本文你可以学到: (1)BER 中 INTEGER 的编码规则(其中1.2.3主要引自<ASN.1编码规则详解.doc>(作者不详,该文档可在CSDN资源中搜索到)). (2)SNMP 中 INTEGER 的编码及解码实现(主要参考 net-snmp源码和 snmp++源码).本文仅对编码相关函数进行了详细的解释.理解它,或许是我们走向自己实现 SNMP 协议的第一步. 特别声明: (1)感谢<ASN.1编码规则详解.doc>作者所做的工作. (2)感谢所有为 net-snm