Look under the start
memcahced threading process
memcached multi-threaded mainly by instantiating multiple
libevent, are a main thread and n workers thread is the main thread or workers
thread all through the the libevent management network event, in fact, each
thread is a separate libevent instance
The main thread is responsible for monitoring the client to
establish a connection request and accept
connections
workers thread
to handle events such as read and write has established good
connections
Look at
general icon:
First
look main data
structures (thread.c):
C
code
- /
* An item in the connection queue. * / - typedef struct conn_queue_item CQ_ITEM;
- struct conn_queue_item {
- int sfd
out of the; - int init_state;
- The
int event_flags; - int read_buffer_size;
- int is_udp;
- CQ_ITEM * next;
- };
CQ_ITEM is actually the main thread accept
returned package to establish a
connection fd
C
code
- / *
A connection queue. * / - typedef struct conn_queue CQ;
- struct conn_queue {
- CQ_ITEM * head;
- CQ_ITEM * tail;
- pthread_mutex_t lock;
- pthread_cond_t cond;
- };
CQ is a CQ_ITEM the singly linked
list
C
code
- typedef struct {
- pthread_t
thread_id; / * unique ID
of this thread *
/ - struct event_base * base; / * libevent handle this
thread uses * / - struct event notify_event; /
* listen event for
notify pipe * / - int notify_receive_fd; / * receiving end of notify pipe *
/ - int notify_send_fd; / * sending end of notify pipe *
/ - CQ
new_conn_queue; / * queue of new
connections to handle * / - }
LIBEVENT_THREAD;
Memcached thread structure package, you
can see each thread contains a CQ queue, a notification pipe
pipe
The instance event_base
and a libevent
Another
important structure is the most important for each network
connection Package conn
C
code
- typedef struct {
- int sfd
out of the; - int State;
- struct event
event; - short which;
- The char *
RBUF; - ... / / Eliminating the status flag and read-write buf
information - }
Conn;
memcached mainly through Settings / conversion
connected to different states to handle the event (core function
drive_machine)
See next thread initialization process:
The main function of the memcached.c, first initialized on the
main thread libevent
C
code
- / *
Initialize main thread libevent instance * / - main_base = event_init ();
Then initialize all workers thread and start
the startup process will be described in detail
later
C
code
- /
* Start up worker threads if
MT mode * / - thread_init (settings.num_threads,
main_base);
Then the main thread calls (only analyze the the
tcp situation, memcached support udp)
C
code
- server_socket (settings.port, 0)
This method encapsulates create listening socket bound
address, set the non-blocking mode and register the listening
socket
the libevent read
event, a series of operations
Then the main thread calls
C
code
- / *
Enter the event loop * / - event_base_loop (main_base, 0);
At this time the main thread start libevent to
accept the the external connection request, the entire start-up process is
completed
Let‘s look at
how to start thread_init all
workers thread, look at the core code
thread_init
C
code
- void
thread_init (int nthreads, struct event_base * main_base) { - /
/. . . Omission - threads = malloc (sizeof (LIBEVENT_THREAD) * nthreads);
- if (threads)
{ - perror ("Can‘t allocate thread
descriptors"); - Exit
(1); - }
- threads [0]. base = main_base;
- threads [0]. thread_id = pthread_self ();
- for (i = 0;
i <nthreads; i + +) { - int fds
[2]; - if (pipe
(fds)) { - perror ("Can‘t create notify pipe");
- Exit
(1); - }
- threads [i notify_receive_fd = fds [0];
- threads [i]. notify_send_fd = the FDS [1];
- setup_thread (& threads [i]);
- }
- /
* Create threads after we‘ve done all the libevent setup. *
/ - for (i = 1;
i <nthreads; i + +) { - create_worker (worker_libevent, & threads
[i]); - }
- }
threads
statement
static
LIBEVENT_THREAD * threads;
The
thread_init first malloc thread space, and then the first threads as the main
thread, the rest are workers thread is then created for each thread a pipe, this
pipe is used as the main thread to inform the workers thread a new connection
arrives
Following
setup_thread for
C
code
- static void setup_thread
(LIBEVENT_THREAD * me) { - if (!
me-> base) { - me-> base = event_init ();
- if (!
me-> base) { - fprintf
(stderr, "Can‘t allocate event base
\ n"); - Exit
(1); - }
- }
- /
* Listen for notifications from other threads * / - event_set (& me-> notify_event, me->
notify_receive_fd, - EV_READ | EV_PERSIST, thread_libevent_process,
me); - event_base_set (me-> base, & me->
notify_event); - if (event_add (& me-> notify_event, 0) == -1) {
- fprintf
(stderr, "Can‘t monitor libevent
notify pipe \ n"); - Exit
(1); - }
- cq_init (& me-> new_conn_queue);
- }
the create setup_thread libevent instance of all
workers thread (the libevent main thread instance has been established in the
main function)
Since the
threads before [0] base = main_base; first thread (the main thread) will not be
here execution event_init ()
In this method, then is to
register all workers thread pipe read end the libevent of read
events, wait for the main thread last all workers
CQ initialization
the create_worker actually is the real start of the thread
pthread_create call worker_libevent method, this method is
executed
event_base_loop start the thread
libevent
Here we need to
remember that each workers thread only data from the read end of the pipe in its
own thread readable trigger, and call
thread_libevent_process
methods
Look at this
function
C
code
- static void
thread_libevent_process (int fd, short which, void * arg) { - LIBEVENT_THREAD * me = arg;
- CQ_ITEM * item;
- char buf
[1]; - if (read
(fd, buf, 1)! = 1) - if (settings.verbose,> 0)
- fprintf
(stderr, "Can‘t read from libevent
pipe \ n"); - item
= cq_peek (& me-> new_conn_queue); - if (NULL! =
item) { - conn * c = conn_new (item-> sfd, item-> init_state,
item-> event_flags, - item-> read_buffer_size, item-> is_udp, me->
base); - . . . / /
Omitted - }
- }
The fd function parameters pipe read end of
the thread descriptor first 1 byte of the pipeline notification signal readout
(this is necessary in the level trigger mode, if does not handle the event, it
will be loop notification know the event to be
treated)
the cq_peek
from the thread CQ queue take the head of the queue a CQ_ITEM, this CQ_ITEM is
thrown into the main thread in the queue, item-> SFD is already established
connection descriptor, by conn_new function of the descriptor registration the
libevent read event, me-> Base on behalf of a thread structure, that is the
descriptor event processing the to this workersThreading, the most important
elements of conn_new method is:
C
code
- the Conn *
conn_new (const int SFD, const int init_state, const int event_flags - const int read_buffer_size, const bool is_udp, struct event_base * base) {
- . . .
- event_set (& c-> event, sfd, event_flags,
event_handler, (void *)
c); - event_base_set (base, & c-> event);
- c-> ev_flags = event_flags;
- if (event_add (& c-> event, 0) == -1)
{ - the
if (conn_add_to_freelist (c)) { - conn_free (c);
- }
- perror ("event_add");
- return NULL;
- }
- . . .
- }
You can see the new connection is registered to an
event (actually EV_READ | EV_PERSIST), processed by the current thread (because
event_base here the workers thread)
When the connection readable data callback event_handler
function, actually event_handler in the main call memcached
core method drive_machine of
Finally, look at the main thread is how to notify workers
thread to handle the new connection, the main thread libevent registered
readable event listening socket descriptor word, that is, when to establish a
connection request, the main thread will handle the callback function is also
the event_handler readable event (in fact, the main thread is initialized by
conn_new listening socket libevent)
Last look at the most central part of the memcached network
event processing - drive_machine
Need
to keep in mind is drive_machine perform multi-threaded environment, the main
thread and the workers will executive
drive_machine,
C
code
- static void drive_machine (conn * c)
{ - bool stop = false;
- int SFD,
flags = 1; - socklen_t addrlen;
- struct sockaddr_storage addr;
- int res;
- assert (c! = NULL);
- while (!
stop) { - switch (c-> state) {
- case
conn_listening: - addrlen = sizeof (addr);
- if ((sfd =
accept (c-> sfd, (struct sockaddr *) & addr, & addrlen)) ==
-1) { - / /
Save n error handling - break;
- }
- if ((flags =
fcntl (sfd, F_GETFL, 0)) <0 | | - fcntl (sfd, F_SETFL, flags | O_NONBLOCK) <0) {
- perror ("setting O_NONBLOCK");
- close (SFD);
- break;
- }
- dispatch_conn_new (sfd, conn_read, EV_READ |
EV_PERSIST, - DATA_BUFFER_SIZE, false);
- break;
- case
conn_read: - the
if (try_read_command, (c) = 0) { - continue;
- }
- .... / / Omitted
- }
- }
First of all, less than in fact be the while loop
misleading (most do java students will immediately think of a cycle of the loop)
while usually to meet a
will
break in the case, while taking into account the vertical trigger mode, you must
read the error of EWOULDBLOCK
Closer to home, drive_machine mainly by the current connection
state to determine the what, by to libevent registered callback after the read
and write time are the core function, so we registered libevent event, while the
event state is written to the conn structure libevent callback will the conn
structure as an argument over the method
parameter
the memcached
connected through an enum declaration
C
code
- enum conn_states to {
- conn_listening, /
** the socket which listens for connections *
/ - conn_read, / ** reading in a command line *
/ - conn_write, / ** writing out a simple response *
/ - conn_nread, / ** reading in a fixed number of bytes *
/ - conn_swallow, / **
swallowing unnecessary bytes w / o storing * / - conn_closing, / ** closing this connection *
/ - conn_mwrite, / ** writing out many items sequentially *
/ - };
Actual for case conn_listening: This is the
main thread to deal with their own Workers threads never do this branch we see
the main thread to accept calls
dispatch_conn_new
(sfd, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE,
false);
This function is
to inform workers thread to see
C
code
- void dispatch_conn_new (int sfd, int init_state, int event_flags,
- int read_buffer_size, int is_udp) {
- CQ_ITEM * item = cqi_new ();
- int thread = (last_thread + 1)% settings.num_threads;
- last_thread = thread;
- item-> sfd = sfd;
- item-> init_state = init_state;
- item-> event_flags = event_flags;
- item-> read_buffer_size = read_buffer_size;
- item-> is_udp = is_udp;
- cq_push (& threads [thread]. new_conn_queue,
item); - MEMCACHED_CONN_DISPATCH (sfd, threads [thread]. Thread_id);
- if (write
(threads [thread].
notify_send_fd, "", 1)! = 1) { - perror ("Writing to thread notify pipe");
- }
- }
You can clearly see, the main thread first
create a new CQ_ITEM, then select a thread through the round robin
strategy
And cq_push this
CQ_ITEM into the thread CQ queue, the corresponding workers thread is how do you
know it
Is through
this
write (threads [thread].
notify_send_fd, "", 1)
Write
1-byte data to this thread pipe, then the thread‘s the libevent immediate
callback the thread_libevent_process method (already described
above)
Then the thread
remove the item, Register read time, when the data connection of that section
will eventually callback drive_machine method, that
is,
method case conn_read
drive_machine: all workers deal with the main thread only processing
conn_listening establish a connection to
this
This part of the
code is indeed more, can not all posted, please refer to the source code, the
latest version 1.2.6, the province went to a lot of optimization such as, each
CQ_ITEM is malloc will malloc a lot to reduce debris generation and so the
details.
reference
from:http://www.cprogramdevelop.com/408519/
Memcached source code analysis (threading
model)--reference,码迷,mamicode.com
Memcached source code analysis (threading
model)--reference