Memcached source code analysis (threading model)--reference

Look under the start
memcahced threading process 

memcached multi-threaded mainly by instantiating multiple
libevent, are a main thread and n workers thread is the main thread or workers
thread all through the the libevent management network event, in fact, each
thread is a separate libevent instance 

The main thread is responsible for monitoring the client to
establish a connection request and accept
connections 
workers thread
to handle events such as read and write has established good
connections 

Look at
general icon: 

First
look main data
structures (thread.c):

C
code

  1. /
    * An item in the connection queue. * /

  2. typedef   struct conn_queue_item CQ_ITEM;

  3. struct conn_queue_item {

  4. int sfd
    out of the;

  5. int init_state;

  6. The
    int event_flags;

  7. int read_buffer_size;

  8. int is_udp;

  9. CQ_ITEM * next;

  10. };

CQ_ITEM is actually the main thread accept
returned package to establish a
connection fd

C
code

  1. / *
    A connection queue. * /

  2. typedef   struct conn_queue CQ;

  3. struct conn_queue {

  4. CQ_ITEM * head;

  5. CQ_ITEM * tail;

  6. pthread_mutex_t lock;

  7. pthread_cond_t cond;

  8. };

CQ is a CQ_ITEM the singly linked
list

C
code

  1. typedef   struct {

  2. pthread_t
    thread_id; / * unique ID
    of this thread *
    /

  3. struct event_base * base; / * libevent handle this
    thread uses * /

  4. struct event notify_event; /
    * listen event for
    notify pipe * /

  5. int notify_receive_fd; / * receiving end of notify pipe *
    /

  6. int notify_send_fd; / * sending end of notify pipe *
    /

  7. CQ
    new_conn_queue; / * queue of new
    connections to handle * /

  8. }
    LIBEVENT_THREAD;

Memcached thread structure package, you
can see each thread contains a CQ queue, a notification pipe
pipe 
The instance event_base
and a libevent 

Another
important structure is the most important for each network
connection Package conn

C
code

  1. typedef   struct {

  2. int sfd
    out of the;

  3. int State;

  4. struct event
    event;

  5. short which;

  6. The char *
    RBUF;

  7. ... / / Eliminating the status flag and read-write buf
    information

  8. }
    Conn;

memcached mainly through Settings / conversion
connected to different states to handle the event (core function
drive_machine) 

See next thread initialization process: 

The main function of the memcached.c, first initialized on the
main thread libevent

C
code

  1. / *
    Initialize main thread libevent instance * /

  2. main_base = event_init ();

Then initialize all workers thread and start
the startup process will be described in detail
later

C
code

  1. /
    * Start up worker threads if
    MT mode * /

  2. thread_init (settings.num_threads,
    main_base);

Then the main thread calls (only analyze the the
tcp situation, memcached support udp)

C
code

  1. server_socket (settings.port, 0)

This method encapsulates create listening socket bound
address, set the non-blocking mode and register the listening
socket 
the libevent read
event, a series of operations 

Then the main thread calls

C
code

  1. / *
    Enter the event loop * /

  2. event_base_loop (main_base, 0);

At this time the main thread start libevent to
accept the the external connection request, the entire start-up process is
completed 

Let‘s look at
how to start thread_init all
workers thread, look at the core code
thread_init

C
code

  1. void
    thread_init (int nthreads, struct event_base * main_base) {

  2. /
    /. . . Omission

  3. threads = malloc (sizeof (LIBEVENT_THREAD) * nthreads);

  4. if (threads)
    {

  5. perror ("Can‘t allocate thread
    descriptors");

  6. Exit
    (1);

  7. }
  8. threads [0]. base = main_base;

  9. threads [0]. thread_id = pthread_self ();
  10. for (i = 0;
    i <nthreads; i + +) {

  11. int fds
    [2];

  12. if (pipe
    (fds)) {

  13. perror ("Can‘t create notify pipe");

  14. Exit
    (1);

  15. }
  16. threads [i notify_receive_fd = fds [0];

  17. threads [i]. notify_send_fd = the FDS [1];
  18. setup_thread (& threads [i]);

  19. }
  20. /
    * Create threads after we‘ve done all the libevent setup. *
    /

  21. for (i = 1;
    i <nthreads; i + +) {

  22. create_worker (worker_libevent, & threads
    [i]);

  23. }

  24. }

threads
statement 
static
LIBEVENT_THREAD * threads; 

The
thread_init first malloc thread space, and then the first threads as the main
thread, the rest are workers thread is then created for each thread a pipe, this
pipe is used as the main thread to inform the workers thread a new connection
arrives 

Following
setup_thread for

C
code

  1. static   void setup_thread
    (LIBEVENT_THREAD * me) {

  2. if (!
    me-> base) {

  3. me-> base = event_init ();

  4. if (!
    me-> base) {

  5. fprintf
    (stderr, "Can‘t allocate event base
    \ n");

  6. Exit
    (1);

  7. }

  8. }
  9. /
    * Listen for notifications from other threads * /

  10. event_set (& me-> notify_event, me->
    notify_receive_fd,

  11. EV_READ | EV_PERSIST, thread_libevent_process,
    me);

  12. event_base_set (me-> base, & me->
    notify_event);
  13. if (event_add (& me-> notify_event, 0) == -1) {

  14. fprintf
    (stderr, "Can‘t monitor libevent
    notify pipe \ n");

  15. Exit
    (1);

  16. }
  17. cq_init (& me-> new_conn_queue);

  18. }

the create setup_thread libevent instance of all
workers thread (the libevent main thread instance has been established in the
main function) 

Since the
threads before [0] base = main_base; first thread (the main thread) will not be
here execution event_init () 
In this method, then is to
register all workers thread pipe read end the libevent of read
events, wait for the main thread last all workers
CQ initialization 

the create_worker actually is the real start of the thread
pthread_create call worker_libevent method, this method is
executed
event_base_loop start the thread
libevent 

Here we need to
remember that each workers thread only data from the read end of the pipe in its
own thread readable trigger, and call 
thread_libevent_process
methods 

Look at this
function

C
code

  1. static   void
    thread_libevent_process (int fd, short which, void * arg) {

  2. LIBEVENT_THREAD * me = arg;

  3. CQ_ITEM * item;

  4. char buf
    [1];
  5. if (read
    (fd, buf, 1)! = 1)

  6. if (settings.verbose,> 0)

  7. fprintf
    (stderr, "Can‘t read from libevent
    pipe \ n");
  8. item
    = cq_peek (& me-> new_conn_queue);
  9. if (NULL! =
    item) {

  10. conn * c = conn_new (item-> sfd, item-> init_state,
    item-> event_flags,

  11. item-> read_buffer_size, item-> is_udp, me->
    base);

  12. . . . / /
    Omitted

  13. }

  14. }

The fd function parameters pipe read end of
the thread descriptor first 1 byte of the pipeline notification signal readout
(this is necessary in the level trigger mode, if does not handle the event, it
will be loop notification know the event to be
treated) 

the cq_peek
from the thread CQ queue take the head of the queue a CQ_ITEM, this CQ_ITEM is
thrown into the main thread in the queue, item-> SFD is already established
connection descriptor, by conn_new function of the descriptor registration the
libevent read event, me-> Base on behalf of a thread structure, that is the
descriptor event processing the to this workersThreading, the most important
elements of conn_new method is:

C
code

  1. the Conn *
    conn_new (const   int SFD, const   int init_state, const   int event_flags

  2. const   int read_buffer_size, const   bool is_udp, struct event_base * base) {

  3. . . .

  4. event_set (& c-> event, sfd, event_flags,
    event_handler, (void *)
    c);

  5. event_base_set (base, & c-> event);

  6. c-> ev_flags = event_flags;

  7. if (event_add (& c-> event, 0) == -1)
    {

  8. the
    if (conn_add_to_freelist (c)) {

  9. conn_free (c);

  10. }

  11. perror ("event_add");

  12. return NULL;

  13. }

  14. . . .

  15. }

You can see the new connection is registered to an
event (actually EV_READ | EV_PERSIST), processed by the current thread (because
event_base here the workers thread) 
When the connection readable data callback event_handler
function, actually event_handler in the main call memcached
core method drive_machine of 

Finally, look at the main thread is how to notify workers
thread to handle the new connection, the main thread libevent registered
readable event listening socket descriptor word, that is, when to establish a
connection request, the main thread will handle the callback function is also
the event_handler readable event (in fact, the main thread is initialized by
conn_new listening socket libevent) 

Last look at the most central part of the memcached network
event processing - drive_machine 
Need
to keep in mind is drive_machine perform multi-threaded environment, the main
thread and the workers will executive
drive_machine,

C
code

  1. static   void drive_machine (conn * c)
    {

  2. bool stop = false;

  3. int SFD,
    flags = 1;

  4. socklen_t addrlen;

  5. struct sockaddr_storage addr;

  6. int res;
  7. assert (c! = NULL);
  8. while (!
    stop) {
  9. switch (c-> state) {

  10. case
    conn_listening:

  11. addrlen = sizeof (addr);

  12. if ((sfd =
    accept (c-> sfd, (struct sockaddr *) & addr, & addrlen)) ==
    -1) {

  13. / /
    Save n error handling

  14. break;

  15. }

  16. if ((flags =
    fcntl (sfd, F_GETFL, 0)) <0 | |

  17. fcntl (sfd, F_SETFL, flags | O_NONBLOCK) <0) {

  18. perror ("setting O_NONBLOCK");

  19. close (SFD);

  20. break;

  21. }

  22. dispatch_conn_new (sfd, conn_read, EV_READ |
    EV_PERSIST,

  23. DATA_BUFFER_SIZE, false);

  24. break;
  25. case
    conn_read:

  26. the
    if (try_read_command, (c) = 0) {

  27. continue;

  28. }

  29. .... / / Omitted

  30. }

  31. }

First of all, less than in fact be the while loop
misleading (most do java students will immediately think of a cycle of the loop)
while usually to meet a 
will
break in the case, while taking into account the vertical trigger mode, you must
read the error of EWOULDBLOCK 

Closer to home, drive_machine mainly by the current connection
state to determine the what, by to libevent registered callback after the read
and write time are the core function, so we registered libevent event, while the
event state is written to the conn structure libevent callback will the conn
structure as an argument over the method
parameter 

the memcached
connected through an enum declaration

C
code

  1. enum conn_states to {

  2. conn_listening, /
    ** the socket which listens for connections *
    /

  3. conn_read, / ** reading in a command line *
    /

  4. conn_write, / ** writing out a simple response *
    /

  5. conn_nread, / ** reading in a fixed number of bytes *
    /

  6. conn_swallow, / **
    swallowing unnecessary bytes w / o storing * /

  7. conn_closing, / ** closing this connection *
    /

  8. conn_mwrite, / ** writing out many items sequentially *
    /

  9. };

Actual for case conn_listening: This is the
main thread to deal with their own Workers threads never do this branch we see
the main thread to accept calls 
dispatch_conn_new
(sfd, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE,
false); 

This function is
to inform workers thread to see

C
code

  1. void dispatch_conn_new (int sfd, int init_state, int event_flags,

  2. int read_buffer_size, int is_udp) {

  3. CQ_ITEM * item = cqi_new ();

  4. int   thread = (last_thread + 1)% settings.num_threads;
  5. last_thread = thread;
  6. item-> sfd = sfd;

  7. item-> init_state = init_state;

  8. item-> event_flags = event_flags;

  9. item-> read_buffer_size = read_buffer_size;

  10. item-> is_udp = is_udp;
  11. cq_push (& threads [thread]. new_conn_queue,
    item);
  12. MEMCACHED_CONN_DISPATCH (sfd, threads [thread]. Thread_id);

  13. if (write
    (threads [thread].
    notify_send_fd, "", 1)! = 1) {

  14. perror ("Writing to thread notify pipe");

  15. }

  16. }

You can clearly see, the main thread first
create a new CQ_ITEM, then select a thread through the round robin
strategy 
And cq_push this
CQ_ITEM into the thread CQ queue, the corresponding workers thread is how do you
know it 

Is through
this 
write (threads [thread].
notify_send_fd, "", 1) 
Write
1-byte data to this thread pipe, then the thread‘s the libevent immediate
callback the thread_libevent_process method (already described
above) 

Then the thread
remove the item, Register read time, when the data connection of that section
will eventually callback drive_machine method, that
is, 
method case conn_read
drive_machine: all workers deal with the main thread only processing
conn_listening establish a connection to
this 

This part of the
code is indeed more, can not all posted, please refer to the source code, the
latest version 1.2.6, the province went to a lot of optimization such as, each
CQ_ITEM is malloc will malloc a lot to reduce debris generation and so the
details.

reference
from:http://www.cprogramdevelop.com/408519/

Memcached source code analysis (threading
model)--reference,码迷,mamicode.com

Memcached source code analysis (threading
model)--reference

时间: 2024-08-26 12:24:55

Memcached source code analysis (threading model)--reference的相关文章

Memcached source code analysis -- Analysis of change of state--reference

This article mainly introduces the process of Memcached, libevent structure of the main thread and worker thread based on the processing of the connection state of mutual conversion (not involving data access operations), the main business logic is t

CEPH CRUSH 算法源码分析 原文CEPH CRUSH algorithm source code analysis

原文地址 CEPH CRUSH algorithm source code analysis http://www.shalandis.com/original/2016/05/19/CEPH-CRUSH-algorithm-source-code-analysis/ 文章比较深入的写了CRUSH算法的原理和过程.通过调试深入的介绍了CRUSH计算的过程.文章中添加了些内容. 写在前面 读本文前,你需要对ceph的基本操作,pool和CRUSH map非常熟悉.并且较深入的读过源码. 分析的方法

AOP spring source code analysis

例子 1 在使用 New 的情况下实现 AOP public class TraceTest { public static void main(String args[]) { TraceTest test = new TraceTest(); test.rpcCall(); } // 虽然 intellij 没有给出提示,但是这个 Trace 还是成功的 @Trace public void rpcCall() { System.out.println("call rpc"); }

Golang Template source code analysis(Parse)

This blog was written at go 1.3.1 version. We know that we use template thought by followed way: func main() { name := "waynehu" tmpl := template.New("test") tmpl, err := tmpl.Parse("hello {{.}}") if err != nil { panic(err) }

Apache Commons Pool2 源码分析 | Apache Commons Pool2 Source Code Analysis

Apache Commons Pool实现了对象池的功能.定义了对象的生成.销毁.激活.钝化等操作及其状态转换,并提供几个默认的对象池实现.在讲述其实现原理前,先提一下其中有几个重要的对象: PooledObject(池对象). PooledObjectFactory(池对象工厂). Object Pool(对象池). 下面分别详细讲解它们的实现. PooledObject(池对象) 用于封装对象(如:线程.数据库连接.TCP连接),将其包裹成可被池管理的对象.提供了两个默认的池对象实现: De

Redis source code analysis

http://zhangtielei.com/posts/blog-redis-dict.html http://zhangtielei.com/assets/photos_redis/redis_dict_structure.png https://github.com/antirez/redis/blob/unstable/src/dict.c http://bbs.redis.cn/forum.php?mod=viewthread&tid=545 http://redisplanet.co

Source Code Analysis in Swift - @autoclosure

@autoclosure:字面理解意思就是自动闭包. 在Swift中有这样的运算符&&,我们知道&&运算符是阻断的 在Swift中运算符是一个函数,如果&&左边是false就不会计算右边的,直接返回false. @inline(__always) @warn_unused_result public func && <T : Boolean, U : Boolean>( lhs: T, rhs: @autoclosure () t

Top 40 Static Code Analysis Tools

https://www.softwaretestinghelp.com/tools/top-40-static-code-analysis-tools/ In this article, I have summarised some of the top static code analysis tools. Can we ever imagine sitting back and manually reading each line of codes to find flaws? To eas

[code] Transformer For Summarization Source Code Reading

Basic Information 作者:李丕绩(腾讯AI Lab) 模型:Transformer + copy mechanism for abstractive summarization 数据集:CNN/Daily Mail Parameters WARNING: IN DEBUGGING MODE USE COPY MECHANISM USE COVERAGE MECHANISM USE AVG NLL as LOSS USE LEARNABLE W2V EMBEDDING RNN TY