原创:Twemproxy源码分析之三:其进程以及时间模型

概述:本来自己写了一篇,但是看到http://www.tuicool.com/articles/NfqeEf这篇更好,于是就算转载过来了吧。下一篇我们会讲到twemproxy的核心,即其转发响应转发请求。

作为一个proxy服务,我们应该深入了解它的进程和事件模型。

进程模型:

通过grep代码,查找fork()和pthread_create()函数的调用,并分析调用者两个函数的代码。我们可以看到twemproxy使用的是单进程单线程来处理请求,另外有一个线程来处理统计数据,但并不参与处理请求功能。

对于单进程单线程的程序,我们就不用去关心线程间通信等内容,可以把重点放在事件驱动模型上来。

事件模型:

Twemproxy自己封装了一个简单的事件驱动模型,相关代码在event目录下。

我们可以先看一下event/nc_event.h:

ifndef _NC_EVENT_H_

#define _NC_EVENT_H_

#include

#define EVENT_SIZE  1024

#define EVENT_READ  0x0000ff

#define EVENT_WRITE 0x00ff00

#define EVENT_ERR   0xff0000

typedef int ( * event_cb_t ) ( void * , uint32_t ) ;

typedef void ( * event_stats_cb_t ) ( void * , void * ) ;

   

文件开头定义了几个宏,然后定义了两个回调函数指针,分别是event的回调函数event_cb_t和状态统计的回调函数event_stats_cb_t。紧接下来是一个条件宏定义:

 

Twemproxy针对不同的平台定义了不同的event_base结构,因为我们用的是Linux,所以只关心epoll下的代码就行了:

#elif NC_HAVE_EPOLL
 
struct event_base {

int                  ep ;

struct epoll_event * event ;

int                  nevent ;

event_cb_t         cb ;

} ;

epoll下的event_base定义很简单:一个epoll句柄ep,一个指向epoll_event数组的指针*event,一个event计数nevent,一个callback函数。

那么需要实现哪些函数呢?在event/nc_event.h的最底下有声明:

struct event_base * event_base_create ( int size , event_cb_t cb ) ;

void event_base_destroy ( struct event_base * evb ) ;

int event_add_in ( struct event_base * evb , struct conn * c ) ;

int event_del_in ( struct event_base * evb , struct conn * c ) ;

int event_add_out ( struct event_base * evb , struct conn * c ) ;

int event_del_out ( struct event_base * evb , struct conn * c ) ;

int event_add_conn ( struct event_base * evb , struct conn * c ) ;

int event_del_conn ( struct event_base * evb , struct conn * c ) ;

int event_wait ( struct event_base * evb , int timeout ) ;

void event_loop_stats ( event_stats_cb_t cb , void * arg ) ;

 

创建:

我们先看看如何创建event_base,下面是event/nc_epoll.c当中的event_base_create()函数定义:

 struct event_base *

event_base_create ( int nevent , event_cb_t cb )

{

struct event_base * evb ;

int status , ep ;

struct epoll_event * event ;

ASSERT ( nevent > 0 ) ;

ep = epoll_create ( nevent ) ;

if ( ep < 0 ) {

log_error ( "epoll create of size %d failed: %s" , nevent , strerror ( errno ) ) ;

return NULL ;

}

event = nc_calloc ( nevent , sizeof ( * event ) ) ;

if ( event == NULL ) {

status = close ( ep ) ;

if ( status < 0 ) {

log_error ( "close e %d failed, ignored: %s" , ep , strerror ( errno ) ) ;

}

return NULL ;

}

evb = nc_alloc ( sizeof ( * evb ) ) ;

if ( evb == NULL ) {

nc_free ( event ) ;

status = close ( ep ) ;

if ( status < 0 ) {

log_error ( "close e %d failed, ignored: %s" , ep , strerror ( errno ) ) ;

}

return NULL ;

}

evb -> ep = ep ;

evb -> event = event ;

evb -> nevent = nevent ;

evb -> cb = cb ;

log_debug ( LOG_INFO , "e %d with nevent %d" , evb -> ep , evb -> nevent ) ;

return evb ;

}

创建event_base的过程,就是初始化前面定义的event_base结构体的内容:

  1. 调用epoll_create()创建epoll句柄
  2. 申请nevent * sizeof(struct epoll_event)大小的空间,存放epoll_event指针数组
  3. 申请sizeof(struct event_base),存放event_base
  4. 对event_base中的ep、event、nevent、cb进行赋值。

调用event_base_create的地点在nc_core.c当中:

1

2

ctx -> evb = event_base_create ( EVENT_SIZE , & core_core ) ;

其中EVENT_SIZE刚才nc_event.h的代码里有定义(1024),而core_core函数则在nc_core.c当中定义。调用event_base_create()之后,我们便得到了一个可以容纳1024个event的event_base。

添加event:

创建完event_base之后,便要向event_base中添加event,需要用到event_add_*函数,我们先来看看event_add_conn()函数:

int

event_add_conn ( struct event_base * evb , struct conn * c )

{

int status ;

struct epoll_event event ;

int ep = evb -> ep ;

ASSERT ( ep > 0 ) ;

ASSERT ( c != NULL ) ;

ASSERT ( c -> sd > 0 ) ;

event . events = ( uint32_t ) ( EPOLLIN | EPOLLOUT | EPOLLET ) ;

event . data . ptr = c ;

status = epoll_ctl ( ep , EPOLL_CTL_ADD , c -> sd , & event ) ;

if ( status < 0 ) {

log_error ( "epoll ctl on e %d sd %d failed: %s" , ep , c -> sd ,

strerror ( errno ) ) ;

} else {

c -> send_active = 1 ;

c -> recv_active = 1 ;

}

return status ;

}

conn的结构非常复杂,我们这里不去细看,因为这个函数只用到了conn->sd、conn->send_active和conn->recv_active,其中sd是要注册的文件描述符,而send_active和read_active只是布尔型的标记位,用来标记当前连接是否开启了读/写。

初始化event的时候,加入了EPOLLIN和EPOLLOUT事件,并指定了边缘触发EPOLLET模式,即当文件描述符的可读/可写状态切换的时候返回,成功后设置conn的send_active和read_active为1。

event_add_in()和event_add_out()其实是修改event的,具体分别将其修改为只读或者读写,同时修改send_active或者read_active。

那么什么时候添加event呢,一般是在创建连接之后,我们检索一下event_add_conn的调用位置:

event / nc_epoll .c : event_add_conn ( struct event_base * evb , struct conn * c )

event / nc_kqueue .c : event_add_conn ( struct event_base * evb , struct conn * c )

event / nc_event .h : int event_add_conn ( struct event_base * evb , struct conn * c ) ;

event / nc_evport .c : event_add_conn ( struct event_base * evb , struct conn * c )

nc_proxy .c :      status = event_add_conn ( ctx -> evb , p ) ;

nc_proxy .c :      status = event_add_conn ( ctx -> evb , c ) ;

nc_server .c :      status = event_add_conn ( ctx -> evb , conn ) ;

其中nc_server.c中的调用是在Twemproxy创建了与后端Memcached Server的链接之后,而nc_proxy.c中的调用则分别是在创建listen端口之后,以及accept客户端的链接之后,这符合我们的预期。接下来我们分别来看看这3处。

监听端口:

针对每一个server pool,Twemproxy会创建一个proxy,每个proxy监听一个指定的端口,监听后即需要像event_base添加事件,相关代码在nc_proxy.c的proxy_listen()函数中。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

static rstatus_t

proxy_listen ( struct context * ctx , struct conn * p )

{

……

status = event_add_conn ( ctx -> evb , p ) ;

if ( status < 0 ) {

log_error ( "event add conn p %d on addr ‘%.*s‘ failed: %s" ,

p -> sd , pool -> addrstr . len , pool -> addrstr . data ,

strerror ( errno ) ) ;

return NC_ERROR ;

}

status = event_del_out ( ctx -> evb , p ) ;

if ( status < 0 ) {

log_error ( "event del out p %d on addr ‘%.*s‘ failed: %s" ,

p -> sd , pool -> addrstr . len , pool -> addrstr . data ,

strerror ( errno ) ) ;

return NC_ERROR ;

}

return NC_OK ;

}

代码已经省略掉了不相关的部分,可以看出,proxy_listen()函数先调用event_add_conn添加一个事件,然后将其修改为只读(等待客户端连接)。

等待连接:

accept的代码在nc_proxy.c的proxy_accept()函数中:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

static rstatus_t

proxy_accept ( struct context * ctx , struct conn * p )

{

……

status = event_add_conn ( ctx -> evb , c ) ;

if ( status < 0 ) {

log_error ( "event add conn from p %d failed: %s" , p -> sd ,

strerror ( errno ) ) ;

c -> close ( ctx , c ) ;

return status ;

}

log_debug ( LOG_NOTICE , "accepted c %d on p %d from ‘%s‘" , c -> sd , p ->sd ,

nc_unresolve_peer_desc ( c -> sd ) ) ;

return NC_OK ;

}

同样省略了不相关的部分,可以看出在accept到客户端的文件描述符后,也是通过event_add_conn添加事件,不过这时没有调用event_del_out(),因为我们是需要向客户端写回数据的。

连接Memcached:

连接后端Memcached的代码在nc_server.c当中的server_connect()函数:

1

2

3

4

5

6

7

8

9

10

11

12

13

rstatus_t

server_connect ( struct context * ctx , struct server * server , struct conn * conn )

{

……

status = event_add_conn ( ctx -> evb , conn ) ;

if ( status != NC_OK ) {

log_error ( "event add conn s %d for server ‘%.*s‘ failed: %s" ,

conn -> sd , server -> pname . len , server -> pname . data ,

strerror ( errno ) ) ;

goto error ;

}

……

}

初始化连接之后,调用event_add_conn()来添加事件,然后连接Memcached,因为连接需要读写,所以没有调用event_del_out()。

从上面的分析看,添加事件的过程基本上是一致的。

事件循环:

完成了event_base的创建和event的添加之后,就进入事件循环,前一节我们讲到时间循环是在core_loop()中做的:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

rstatus_t

core_loop ( struct context * ctx )

{

int nsd ;

nsd = event_wait ( ctx -> evb , ctx -> timeout ) ;

if ( nsd < 0 ) {

return nsd ;

}

core_timeout ( ctx ) ;

stats_swap ( ctx -> stats ) ;

return NC_OK ;

}

core_loop()调用了event_wait()函数做事件循环,event_wait()的代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

int

event_wait ( struct event_base * evb , int timeout )

{

int ep = evb -> ep ;

struct epoll_event * event = evb -> event ;

int nevent = evb -> nevent ;

ASSERT ( ep > 0 ) ;

ASSERT ( event != NULL ) ;

ASSERT ( nevent > 0 ) ;

for ( ; ; ) {

int i , nsd ;

nsd = epoll_wait ( ep , event , nevent , timeout ) ;

if ( nsd > 0 ) {

for ( i = 0 ; i < nsd ; i ++ ) {

struct epoll_event * ev = & evb -> event [ i ] ;

uint32_t events = 0 ;

log_debug ( LOG_VVERB , "epoll " PRIX32 " triggered on conn %p",

ev -> events , ev -> data . ptr ) ;

if ( ev -> events & EPOLLERR ) {

events |= EVENT_ERR ;

}

if ( ev -> events & ( EPOLLIN | EPOLLHUP ) ) {

events |= EVENT_READ ;

}

if ( ev -> events & EPOLLOUT ) {

events |= EVENT_WRITE ;

}

if ( evb -> cb != NULL ) {

evb -> cb ( ev -> data . ptr , events ) ;

}

}

return nsd ;

}

if ( nsd == 0 ) {

if ( timeout == - 1 ) {

log_error ( "epoll wait on e %d with %d events and %d timeout "

"returned no events" , ep , nevent , timeout ) ;

return - 1 ;

}

return 0 ;

}

if ( errno == EINTR ) {

continue ;

}

log_error ( "epoll wait on e %d with %d events failed: %s" , ep , nevent ,

strerror ( errno ) ) ;

return - 1 ;

}

NOT_REACHED ( ) ;

}

epoll_wait()返回当前I/O就绪的句柄数nsd,同时将epoll_event放入*event指向的数组中,随后就可以根据nsd和*event去依次访问每一个句柄,调用event_base的cb函数进行处理。

事件触发:

前面我们分析event_base_create()的时候了解到,event_base_create()这个函数仅在nc_core.c当中调用,也就是说全局只有一个event_base,而这个event_base上所有的事件触发的回调函数event_base->cb都是指向调用event_base_create()函数时所设置的core_core()。那么问题来了,如何区分不同的连接(后端Memcached连接句柄,客户端连接句柄以及proxy的监听端口句柄)呢?

我们需要具体看一下core_core()这个函数的代码,这个函数非常短:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

rstatus_t

core_core ( void * arg , uint32_t events )

{

rstatus_t status ;

struct conn * conn = arg ;

struct context * ctx = conn_to_ctx ( conn ) ;

log_debug ( LOG_VVERB , "event " PRIX32 " on %c %d" , events ,

conn -> client ? ‘c‘ : ( conn -> proxy ? ‘p‘ : ‘s‘ ) , conn -> sd ) ;

conn -> events = events ;

if ( events & EVENT_ERR ) {

core_error ( ctx , conn ) ;

return NC_ERROR ;

}

if ( events & EVENT_READ ) {

status = core_recv ( ctx , conn ) ;

if ( status != NC_OK || conn -> done || conn -> err ) {

core_close ( ctx , conn ) ;

return NC_ERROR ;

}

}

if ( events & EVENT_WRITE ) {

status = core_send ( ctx , conn ) ;

if ( status != NC_OK || conn -> done || conn -> err ) {

core_close ( ctx , conn ) ;

return NC_ERROR ;

}

}

return NC_OK ;

}

可以看出,针对不同的事件:EVENT_READ、EVENT_WRITE,core_core()函数只是分别去调用了core_recv()和core_send()函数,而EVENT_ERR事件则报错。

那么core_recv()和core_send()函数做了什么呢?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

static rstatus_t

core_recv ( struct context * ctx , struct conn * conn )

{

rstatus_t status ;

status = conn -> recv ( ctx , conn ) ;

if ( status != NC_OK ) {

log_debug ( LOG_INFO , "recv on %c %d failed: %s" ,

conn -> client ? ‘c‘ : ( conn -> proxy ? ‘p‘ : ‘s‘ ) , conn -> sd ,

strerror ( errno ) ) ;

}

return status ;

}

static rstatus_t

core_send ( struct context * ctx , struct conn * conn )

{

rstatus_t status ;

status = conn -> send ( ctx , conn ) ;

if ( status != NC_OK ) {

log_debug ( LOG_INFO , "send on %c %d failed: %s" ,

conn -> client ? ‘c‘ : ( conn -> proxy ? ‘p‘ : ‘s‘ ) , conn -> sd ,

strerror ( errno ) ) ;

}

return status ;

}

可以看出他们只是分别调用了conn->recv()和conn->send(),我们看一下conn这个结构体相关部分的定义:

1

2

3

4

5

6

7

8

9

10

11

12

struct conn {

……

conn_recv_t         recv ;

conn_recv_next_t   recv_next ;

conn_recv_done_t   recv_done ;

conn_send_t         send ;

conn_send_next_t   send_next ;

conn_send_done_t   send_done ;

conn_close_t       close ;

conn_active_t       active ;

……

}

省略掉了无关的部分,可以看到conn包含了若干handler的函数指针,这其中就包含recv和send。那么这些字段是在创建conn的时候初始化的。针对不同的conn,为他们初始化不同的handler即可。

那么,首先我们看看listen的过程,调用proxy_listen()函数的位置是在nc_proxy.c的proxy_each_init()当中:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

rstatus_t

proxy_each_init ( void * elem , void * data )

{

rstatus_t status ;

struct server_pool * pool = elem ;

struct conn * p ;

p = conn_get_proxy ( pool ) ;

if ( p == NULL ) {

return NC_ENOMEM ;

}

status = proxy_listen ( pool -> ctx , p ) ;

if ( status != NC_OK ) {

p -> close ( pool -> ctx , p ) ;

return status ;

}

log_debug ( LOG_NOTICE , "p %d listening on ‘%.*s‘ in %s pool %" PRIu32 " ‘%.*s‘"

" with %" PRIu32 " servers" , p -> sd , pool -> addrstr . len ,

pool -> addrstr . data , pool -> redis ? "redis" : "memcache" ,

pool -> idx , pool -> name . len , pool -> name . data ,

array_n ( & pool -> server ) ) ;

return NC_OK ;

}

在调用proxy_listen()之前,调用了conn_get_proxy()来创建并初始化conn:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

struct conn *

conn_get_proxy ( void * owner )

{

struct server_pool * pool = owner ;

struct conn * conn ;

conn = _conn_get ( ) ;

if ( conn == NULL ) {

return NULL ;

}

conn -> redis = pool -> redis ;

conn -> proxy = 1 ;

conn -> recv = proxy_recv ;

conn -> recv_next = NULL ;

conn -> recv_done = NULL ;

conn -> send = NULL ;

conn -> send_next = NULL ;

conn -> send_done = NULL ;

conn -> close = proxy_close ;

conn -> active = NULL ;

conn -> ref = proxy_ref ;

conn -> unref = proxy_unref ;

conn -> enqueue_inq = NULL ;

conn -> dequeue_inq = NULL ;

conn -> enqueue_outq = NULL ;

conn -> dequeue_outq = NULL ;

conn -> ref ( conn , owner ) ;

log_debug ( LOG_VVERB , "get conn %p proxy %d" , conn , conn -> proxy ) ;

return conn ;

}

通过这个函数定义,我们看到为conn初始化了recv、close、ref和unref这4个handler,其中recv的handler是proxy_recv,那么也就是说,当监听句柄由不可读变为可读时,将会触发proxy_recv这个回调函数:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

rstatus_t

proxy_recv ( struct context * ctx , struct conn * conn )

{

rstatus_t status ;

ASSERT ( conn -> proxy && ! conn -> client ) ;

ASSERT ( conn -> recv_active ) ;

conn -> recv_ready = 1 ;

do {

status = proxy_accept ( ctx , conn ) ;

if ( status != NC_OK ) {

return status ;

}

} while ( conn -> recv_ready ) ;

return NC_OK ;

}

果不其然,这个函数调用了proxy_accept():

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

static rstatus_t

proxy_accept ( struct context * ctx , struct conn * p )

{

rstatus_t status ;

struct conn * c ;

int sd ;

ASSERT ( p -> proxy && ! p -> client ) ;

ASSERT ( p -> sd > 0 ) ;

ASSERT ( p -> recv_active && p -> recv_ready ) ;

for ( ; ; ) {

sd = accept ( p -> sd , NULL , NULL ) ;

if ( sd < 0 ) {

if ( errno == EINTR ) {

log_debug ( LOG_VERB , "accept on p %d not ready - eintr" , p -> sd ) ;

continue ;

}

if ( errno == EAGAIN || errno == EWOULDBLOCK ) {

log_debug ( LOG_VERB , "accept on p %d not ready - eagain" , p -> sd );

p -> recv_ready = 0 ;

return NC_OK ;

}

log_error ( "accept on p %d failed: %s" , p -> sd , strerror ( errno ) ) ;

return NC_ERROR ;

}

break ;

}

c = conn_get ( p -> owner , true , p -> redis ) ;

if ( c == NULL ) {

log_error ( "get conn for c %d from p %d failed: %s" , sd , p -> sd ,

strerror ( errno ) ) ;

status = close ( sd ) ;

if ( status < 0 ) {

log_error ( "close c %d failed, ignored: %s" , sd , strerror ( errno ) ) ;

}

return NC_ENOMEM ;

}

c -> sd = sd ;

……

}

代码已经滤掉了无关部分,proxy_accept在accept到客户端的句柄后,通过调用conn_get()创建并初始化conn,在conn_get()中对conn做了如下初始化:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

conn -> recv = msg_recv ;

conn -> recv_next = req_recv_next ;

conn -> recv_done = req_recv_done ;

conn -> send = msg_send ;

conn -> send_next = rsp_send_next ;

conn -> send_done = rsp_send_done ;

conn -> close = client_close ;

conn -> active = client_active ;

conn -> ref = client_ref ;

conn -> unref = client_unref ;

conn -> enqueue_inq = NULL ;

conn -> dequeue_inq = NULL ;

conn -> enqueue_outq = req_client_enqueue_omsgq ;

conn -> dequeue_outq = req_client_dequeue_omsgq ;

而在创建Memcached连接时,是这么初始化的:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

conn -> recv = msg_recv ;

conn -> recv_next = rsp_recv_next ;

conn -> recv_done = rsp_recv_done ;

conn -> send = msg_send ;

conn -> send_next = req_send_next ;

conn -> send_done = req_send_done ;

conn -> close = server_close ;

conn -> active = server_active ;

conn -> ref = server_ref ;

conn -> unref = server_unref ;

conn -> enqueue_inq = req_server_enqueue_imsgq ;

conn -> dequeue_inq = req_server_dequeue_imsgq ;

conn -> enqueue_outq = req_server_enqueue_omsgq ;

conn -> dequeue_outq = req_server_dequeue_omsgq ;

这些handler的作用,这里暂时不细究,只需要理解事件触发回调函数的过程就可以了。

小结:

通过上面的分析,我们知道了Twemproxy使用了单进程模型,并且自己封装了事件模型,这与redis是类似的。这也就意味着,Twemproxy不适合做CPU密集的操作,仅作为proxy来用是OK的,若要对Twemproxy的功能进行扩展,要特别注意这一点,以免影响Twemproxy的吞吐量。

时间: 2024-10-17 04:44:53

原创:Twemproxy源码分析之三:其进程以及时间模型的相关文章

原创:Twemproxy源码分析之一 入口函数及启动过程

最近开始研究twemproxy先将其中的知识点归纳整理一下.作为一个系列的知识点. 一.Twemproxy简介 Twemproxy是memcache与redis的代理,由twitter公司开发并且目前已经开源.研究这个对于理解网络通信有很大的帮助. 亮点有以下: 1.twemproxy自己创建并维护和后端server(即reids实例)的长连接,保证长连接对于来自不同client但去向同一server的复用. 2.自动识别异常状态的server,保证之后的请求不会被转发到该异常server上.但

原创 :twemproxy源码分析之二 nc_run

nc_run函数一共没几行.主要的函数有core_start, core_loop以及core_stop. 其中core_start用来初始化conn,mbuf,msg这些重要数据结构的基本参数值,更重要的是根据配置文件以及命令行参数设置该实例中的context变量(上一节我们提到过一个twemproxy实例对应于一个context变量ctx) core_loop函数主要是在调用event_wait等待io事件的发生. core_stop则是对于core_start函数的反向操作 1.core_

twemproxy源码分析之五:zero copy

先给出msg的数据结构: truct msg { TAILQ_ENTRY(msg)     c_tqe; TAILQ_ENTRY(msg)     s_tqe; TAILQ_ENTRY(msg)     m_tqe; uint64_t             id; int                  route_idx; struct msg           *peer; struct conn          *owner; struct rbnode        tmo_rb

twemproxy源码分析之四:处理流程ji(内容属于转载。

nc_connection.c 很赞的注释: * nc_connection.[ch] * Connection (struct conn) * + + + * | | | * | Proxy | * | nc_proxy.[ch] | * / * Client Server * nc_client.[ch] nc_server.[ch] messsage.c * nc_message.[ch] * _message (struct msg) * + + . * | | . * / \ . *

spark 源码分析之三 -- LiveListenerBus介绍

LiveListenerBus 首先,它定义了 4 个 消息堵塞队列,队列的名字分别为shared.appStatus.executorManagement.eventLog.队列的类型是 org.apache.spark.scheduler.AsyncEventQueue#AsyncEventQueue,保存在 queues 变量中.每一个队列上都可以注册监听器,如果队列没有监听器,则会被移除. 它有启动和stop和start两个标志位来指示 监听总线的的启动停止状态. 如果总线没有启动,有事

Thrift源码分析(四)-- 方法调用模型分析

RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作.只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数.服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用. 这篇讲讲Thrfit的方法调用模型.Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制. 和方法调用相关的几个核心类: 1. 自动生成的Iface接口,是远程方法的顶层接口 2. 自动生成的Proc

NHibernate源码分析

NHibernate源码分析之开篇: 计划和安排 只从使用NHibernate以来,请被其强大的功能和使用的简洁所吸引. 为了进一步研究NHibernate,决定分析其源代码,如有感兴趣者,欢迎一起研究. 这里列出了将要分析的部分: 1.        NHibernate配置和持久对象映射文件 2.        NHibernate架构分析(uml图) 3.        NHibernate源码分析之一: 配置信息 4.        NHibernate源码分析之一(续): 对象映射 5

Spark 源码分析系列

如下,是 spark 源码分析系列的一些文章汇总,持续更新中...... Spark RPC spark 源码分析之五--Spark RPC剖析之创建NettyRpcEnv spark 源码分析之六--Spark RPC剖析之Dispatcher和Inbox.Outbox剖析 spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析 spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClie

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反