rpc框架yar之源码解析- 协议和传输

Yar 协议头和传输源码分析

 这篇博客主要学习rpc框架yar的协议头和传输的实现, 能力有限,有些语句没有看懂,所以猜测了一部分。

yar_header_t的实现

_yar_header的定义主要在yar_protocol.h和yar_protocol.c里面,下面介绍这两个文件里的源码。
借用网上的一幅图片,请求体包括 yar_header + packager_name + yar_request_t 这三个部分,返回类似。
下面主要介绍yar_header的部分。

?

yar_protocol.h


// 定义yar_header_t这个结构体的内容,一共82个字节。
typedef struct _yar_header {
    unsigned int   id;                        // 4
    unsigned short version;             // 2
    unsigned int   magic_num;        // 4
    unsigned int   reserved;            // 4
    unsigned char  provider[32];    // 32
    unsigned char  token[32];        // 32
    unsigned int   body_len;          //  4
}

// 下面是声明两个函数,稍后在.c文件中说明具体方法
yar_header_t * php_yar_protocol_parse(char *payload);
void php_yar_protocol_render(yar_header_t *header, uint id, char *provider, char *token, uint body_len, uint reserved);

yar_protocol.c


// 这个函数的作用应该是给 header指向的结构体赋值
/* htonl函数介绍:将主机数转换成无符号长整型的网络字节顺序。
 *                           本函数将一个32位数从主机字节顺序转换成网络字节顺序。
 */
void php_yar_protocol_render(yar_header_t *header, uint id, char *provider, char *token, uint body_len, uint reserved) /* {{{ */ {
    header->magic_num = htonl(YAR_PROTOCOL_MAGIC_NUM);
    header->id = htonl(id);
    header->body_len = htonl(body_len);
    header->reserved = htonl(reserved);
    if (provider) {
        memcpy(header->provider, provider, strlen(provider));
    }
    if (token) {
        memcpy(header->token, token, strlen(token));
    }
    return;
} /* }}} */

// 这个函数是将payload指向的字符串,转成header结构体对应的内容 然后返回。
yar_header_t * php_yar_protocol_parse(char *payload) /* {{{ */ {
    yar_header_t *header = (yar_header_t *)payload;

    header->magic_num = ntohl(header->magic_num);

    if (header->magic_num != YAR_PROTOCOL_MAGIC_NUM) {
        header->magic_num = htonl(header->magic_num);
        return NULL;
    }

    header->id = ntohl(header->id);
    header->body_len = ntohl(header->body_len);
    header->reserved = ntohl(header->reserved);

    return header;
} /* }}} */

Q: 下面的这句 我没有看明白, 感觉直接返回就可以了。

if (header->magic_num != YAR_PROTOCOL_MAGIC_NUM) {
        header->magic_num = htonl(header->magic_num);
        return NULL;
    }


yar传输源码分析

关于yar传输方面源码分析,还是先从下面的图开始吧。感谢作图的人,比我说的直观。

传输层主要涉及到如下几个文件:
transports/curl.c、transports/socket.c 、yar_transport.h、yar_transport.c 4个文件。

yar_transport.h



yar_transport.h 主要定义两个函数和一些结构体。

// 这个应该是传输方式的方法定义,后面会在curl.c socket.c中实现下面的内容
typedef struct _yar_transport_interface {
    void *data;
    int  (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
    int  (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
    struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
    int  (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
    int  (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
    void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;

typedef struct _yar_transport {
    const char *name;
    struct _yar_transport_interface * (*init)();
    void (*destroy)(yar_transport_interface_t *self);
    yar_transport_multi_t *multi;
} yar_transport_t;

// 下面是并行调用时的结构体定义
typedef struct _yar_transport_multi_interface {
    void *data;
    int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp);
    int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback);
    void (*close)(struct _yar_transport_multi_interface *self);
} yar_transport_multi_interface_t;

typedef struct _yar_transport_multi {
    struct _yar_transport_multi_interface * (*init)();
} yar_transport_multi_t;

// 根据名称获得对应的传输方式,curl / sock
PHP_YAR_API const yar_transport_t * php_yar_transport_get(char *name, int nlen);

// 注册传输方式到yar_transports_list
PHP_YAR_API int php_yar_transport_register(const yar_transport_t *transport);

yar_transport.c



这个文件主要实现上面两个函数,即注册curl和socket两种方式,然后根据name获得传输方式。

// 定义list
struct _yar_transports_list {
    unsigned int size;
    unsigned int num;
    const yar_transport_t **transports;
} yar_transports_list;

// 注册方法到list里面
PHP_YAR_API int php_yar_transport_register(const yar_transport_t *transport) /* {{{ */ {

    if (!yar_transports_list.size) {
       yar_transports_list.size = 5;
       yar_transports_list.transports = (const yar_transport_t **)malloc(sizeof(yar_transport_t *) * yar_transports_list.size);
    } else if (yar_transports_list.num == yar_transports_list.size) {
       yar_transports_list.size += 5;
       yar_transports_list.transports = (const yar_transport_t **)realloc(yar_transports_list.transports, sizeof(yar_transport_t *) * yar_transports_list.size);
    }
    yar_transports_list.transports[yar_transports_list.num] = transport;

    return yar_transports_list.num++;
} /* }}} */

// 根据name获得方式
PHP_YAR_API const yar_transport_t * php_yar_transport_get(char *name, int nlen) /* {{{ */ {
    int i = 0;
    for (;i<yar_transports_list.num;i++) {
        if (strncmp(yar_transports_list.transports[i]->name, name, nlen) == 0) {
            return yar_transports_list.transports[i];
        }
    }

    return NULL;
} /* }}} */

// 下面这两句没有看懂什么意思
le_calldata = zend_register_list_destructors_ex(php_yar_calldata_dtor, NULL, "Yar Call Data", module_number);
le_plink = zend_register_list_destructors_ex(NULL, php_yar_plink_dtor, "Yar Persistent Link", module_number);

transports/socket.c



主要是socket方式需要的方法的具体实现, 先看下面方法
用yar_transport_t 定义一个常量

const yar_transport_t yar_transport_socket = {
    "sock",
    php_yar_socket_init,
    php_yar_socket_destroy,
    NULL
}; /* }}} */

php_yar_socket_init的定义如下

yar_transport_interface_t * php_yar_socket_init() /* {{{ */ {
    yar_socket_data_t *data;
    yar_transport_interface_t *self;

    self = emalloc(sizeof(yar_transport_interface_t));
    self->data = data = ecalloc(1, sizeof(yar_socket_data_t));

    self->open      = php_yar_socket_open;
    self->send      = php_yar_socket_send;
    self->exec      = php_yar_socket_exec;
    self->setopt    = php_yar_socket_setopt;
    self->calldata  = NULL;
    self->close     = php_yar_socket_close;

    return  self;
} /* }}} */

然后我们再看yar_socket_data_t的定义, 其实主要使用php_stream这个流类型 底层的具体实现不追了。

typedef struct _yar_socket_data_t {
    char persistent;
    php_stream *stream;  // 主要
} yar_socket_data_t;

下面是php_yar_socket_open函数的实现

php_stream *stream = NULL;
...
// 这个是核心代码,应该是根据地址和配置创建一个流的对象。
stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);

php_yar_socket_close, 这个函数就是关闭流,然后释放空间

if (!data->persistent && data->stream) {
        php_stream_close(data->stream);
    }

php_yar_socket_send 这个函数主要是把需要发送的内容,按照一定格式pack,然后放到stream中。
里面有几个函数没有看懂,以后在详细查找:

// 先构造header的内容
php_yar_protocol_render(&header, request->id, "Yar PHP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0);

// 复制到buf这个变量中
memcpy(buf, (char *)&header, sizeof(yar_header_t));

// 这个地方 我不太了解为什么要用goto
wait_io:
        if (bytes_left) {
            retval = php_select(fd+1, NULL, &rfds, NULL, &tv);

            if (retval == -1) {
                zend_string_release(payload);
                spprintf(msg, 0, "select error ‘%s‘", strerror(errno));
                return 0;
            } else if (retval == 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
                return 0;
            }

            if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
                if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) {
                    bytes_left -= ret;
                    bytes_sent += ret;
                }
            }
            goto wait_io;
        }
    }

transports/curl.c



curl.c跟socket类似,依赖curl/curl.h文件里的内容,如

CURL        *cp;

原文地址:http://blog.51cto.com/9681602/2160503

时间: 2024-10-12 21:29:03

rpc框架yar之源码解析- 协议和传输的相关文章

Volley框架使用及源码解析

1. Volley特点 (1) 特别适合数据量小,通信频繁的网络操作. (2) 扩展性强.Volley 中大多是基于接口的设计,可根据需要自行定制. (3) 一定程度符合 Http 规范,包括返回 ResponseCode(2xx.3xx.4xx.5xx)的处理,请求头的处理, 缓存机制的支持等.并支持重试及优先级定义. (4) 提供简便的图片加载工具 GitHub地址:https//github.com/mcxiaoke/android-volley 2. 概念介绍 Request:表示一个请

NIO框架之MINA源码解析(五):NIO超级陷阱和使用同步IO与MINA通信

1.NIO超级陷阱 之所以说NIO超级陷阱,就是因为我在本系列开头的那句话,因为使用缺陷导致客户业务系统瘫痪.当然,我对这个问题进行了很深的追踪,包括对MINA源码的深入了解,但其实之所以会出现这个问题,它的根不是MINA的原因,而是JDK底层的问题. JDK底层在实现nio时,为了能够唤醒等待在io上的线程,在windows平台使用了两个端口建立连接发消息实现.看如下代码: [java] view plain copy print? public class NIOTest { @Test p

NIO框架之MINA源码解析(一):背景

?? "你们的agent占了好多系统的端口,把我们的很多业务系统都给整死了,给我们造成了很大的损失,要求你们的相关领导下周过来道歉"   --   来自我们的一个客户. 怎么可能呢,我们都不相信,我们的agent只占一个端口啊! 事实胜过雄辩,经过查证,确实是由于我们的agent占了好多系统的端口,我看了一下日志,基本把系统可用的端口占完了! 为什么呢?MINA框架私自开的! 由于我们的agent端使用了NIO通信框架MINA,但并没有使用好,造成了这一几乎毁灭行的灾难. 还是先看代码

NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

1.粘包与段包 粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾.造成的可能原因: 发送端需要等缓冲区满才发送出去,造成粘包 接收方不及时接收缓冲区的包,造成多个包接收 断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全. 2.消息传输的格式 消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束. 消息头+消息体    即固定长度的消息,前几个字节为消息

NIO框架之MINA源码解析(二):mina核心引擎

MINA的底层还是利用了jdk提供了nio功能,mina只是对nio进行封装,包括MINA用的线程池都是jdk直接提供的. MINA的server端主要有accept.processor.session三部分组成的.其中accept主要负责在指定的端口监听,若有新连接则建立一个新的session:processor则负责处理session对应的发送数据和接收数据并调用上层处理:session则缓存当前连接数据. MINA采用了线程懒启动的技术,即最少启动线程,在MINA server启动的时候,

NIO框架之MINA源码解析(三):底层通信与责任链模式应用

本文主要介绍下在mina中责任链模式的应用以及mina对于数据读写的处理. 在mina中,对数据的读操作是在processor类里面触发的,收到新消息后就触发读数据链去处理新消息直到自己的业务逻辑代码(IoHandler). 在mina中,数据的写(write)和发(send)差别相对较大,mina中的写消息最终的结果只是把要写的消息经过写数据链处理后的最终结果放在了一个缓存中,并把当前session标记为可发. 数据的发送就是传统中我们所说的发消息,就是把写消息最终处理的结果发送到客户端,待发

NIO框架之MINA源码解析(转)

http://blog.csdn.net/column/details/nio-mina-source.html http://blog.csdn.net/chaofanwei/article/details/38848085 http://blog.csdn.net/chaofanwei/article/details/38871115 http://blog.csdn.net/chaofanwei/article/details/38920963 http://blog.csdn.net/c

iOS开发之Masonry框架源码解析

Masonry是iOS在控件布局中经常使用的一个轻量级框架,Masonry让NSLayoutConstraint使用起来更为简洁.Masonry简化了NSLayoutConstraint的使用方式,让我们可以以链式的方式为我们的控件指定约束.本篇博客的主题不是教你如何去使用Masonry框架的,而是对Masonry框架的源码进行解析,让你明白Masonry是如何对NSLayoutConstraint进行封装的,以及Masonry框架中的各个部分所扮演的角色是什么样的.在Masonry框架中,仔细

神经网络caffe框架源码解析--softmax_layer.cpp类代码研究

// Copyright 2013 Yangqing Jia // #include <algorithm> #include <vector> #include "caffe/layer.hpp" #include "caffe/vision_layers.hpp" #include "caffe/util/math_functions.hpp" using std::max; namespace caffe { /**