Understanding The Memcached Source Code — Event Driven I

Photo by NeONBRAND on Unsplash
Slab allocator is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely, LRU algorithm for entry expiration; and an event driven model based on libevent; and the consistent harsh for data distribution, are built around it.

Slab I
Slab II
Slab III
LRU I
LRU II
LRU III
Event driven I (this article)

In classic multithreading, large amounts of slow and blocking operations, mostly, I/O, can easily drain out available thread resources, which severely constrains the maximum number of requests a server can handle per unit time. More specific, threads are scheduled out and put into sleep in the middle of procedures that contain blocking I/O, despite piling up requests packets queuing within the network stack. In such situation, server side will show low throughput, low CPU saturation and high latency.

Here is a post around server side performance. Feel free to check it out.

An introduction to event driven

This is where asynchronous event driven model comes in, which drops the idea that context of a session must be coupled with a thread. In such model, session contexts are contained within and managed by a drive machine and a thread is fully unleashed with unblocking I/O operations. More specific, 1) when I/O occurs in the middle of a procedure, a thread does not block but instantly switch to the processing of another request; and 2) when the I/O completes, the context will be picked up by the drive machine to resume the interrupted session. As such, a potentially slow procedure is effectively divided into multiple manageable pieces, and the cutting points are marked by I/O operations. This results in more performant single threaded architecture in comparison to those employ thousands of threads.

In my understanding, event driven model, in its essential, is yet another instance of “divide and conquer” and “trade space for time” in a not very obvious way.

On the other hand, multithreading can be still used in event driven model purely for the purpose of parallelism. Thus, in practice, the number of threads employed does not exceed that of CPU cores. I will discuss the Memcached multithreading soon in thread model.

The drive machine

From a developer’s point of view, there are numerous ways to program an asynchronous even driven server. Memcached adopts an approach called state machine, in which logic flow is divided into non-linear fragments identified with states, which is normally controlled by a huge switch case. The bright-side of this approach is that the mentioned breakdown of slow procedure is sincerely reflected by the logic fragments. But it makes the code style a bit different from what most developers are already used to.

Following is how the event driven state machine actually looks like.

static void drive_machine(conn *c) {
bool stop = false;
int sfd;
socklen_t addrlen;
struct sockaddr_storage addr;
int nreqs = settings.reqs_per_event;
int res;
const char *str;
...

assert(c != NULL);

while (!stop) {

switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
...
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
...
if (sfd == -1) {
...
}
...

if (settings.maxconns_fast &&
stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) {
...
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, c->transport);
}

stop = true;
break;

case conn_waiting:
...
stop = true;
break;

case conn_read:
...
break;

case conn_parse_cmd :
...
break;

case conn_new_cmd:
...
break;

case conn_nread:
...
break;
case conn_swallow:
...
break;
case conn_write:
...
case conn_mwrite:
...
break;
case conn_closing:
if (IS_UDP(c->transport))
conn_cleanup(c);
else
conn_close(c);
stop = true;
break;

case conn_closed:
/* This only happens if dormando is an idiot. */
abort();
break;

case conn_watch:
/* We handed off our connection to the logger thread. */
stop = true;
break;
case conn_max_state:
assert(false);
break;
}
}

return;
}

The omitted switch blocks will be discussed in detail in following posts, so no worries.

Thread model

The thread model of Memcached is quite standard. There is a dispatcher thread, and there are preconfigured number of worker threads. Each thread runs an independent drive machine described above. The dispatcher thread, of which the responsible is to distribute requests among worker threads, only executes code under conn_listening. The actual requests are completed by worker threads running on the rest of the states.

Next we go through the bootstrap portion of main function which establishes the various building blocks of event driven as well as the multithreading mechanism. And we will also see locations of the discussed sub-system ***_init methods in relation to the whole initialization process.

First thing first, all the system initialization relevant procedures are executed in the discussed dispatcher thread.

The call stack of this process is

|-main
|-hash_init (LRU II)
|-assoc_init (LRU II)
|-conn_init
|-slabs_init (Slab I)
|-memcached_thread_init
|-setup_thread
|-create_worker
|-server_sockets
|-new_socket
|-conn_new

System initialization

int main (int argc, char **argv) {
...// scr: ---------------------------------------------------> *)
...// scr: initialize `settings` using default values and command line arguements
...// scr: sanity check

if (hash_init(hash_type) != 0) { // scr: -------------> LRU II
fprintf(stderr, "Failed to initialize hash_algorithm!\n");
exit(EX_USAGE);
}

...// scr: initialize `settings` & sanity check

if (maxcore != 0) { // scr: ------------------------------> 1)
struct rlimit rlim_new;
/*
* First try raising to infinity; if that fails, try bringing
* the soft limit to the hard.
*/
if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
/* failed. try raising just to the old max */
rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
(void)setrlimit(RLIMIT_CORE, &rlim_new);
}
}
/*
* getrlimit again to see what we ended up with. Only fail if
* the soft limit ends up 0, because then no core files will be
* created at all.
*/

if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
fprintf(stderr, "failed to ensure corefile creation\n");
exit(EX_OSERR);
}
}

if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { // scr: ------> 1)
fprintf(stderr, "failed to getrlimit number of files\n");
exit(EX_OSERR);
} else {
rlim.rlim_cur = settings.maxconns;
rlim.rlim_max = settings.maxconns;
if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
fprintf(stderr, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n");
exit(EX_OSERR);
}
}

...// scr: ---------------------------------------------------> *)

main_base = event_init(); // scr: ------------------------> 2)
...// scr: stat
assoc_init(settings.hashpower_init); // scr: ---------> LRU II
conn_init(); // scr: -------------------------------------> 3)
slabs_init(settings.maxbytes, settings.factor, preallocate,
use_slab_sizes ? slab_sizes : NULL); // scr: -> Slab I
...
memcached_thread_init(settings.num_threads, main_base); //scr:4)

...// scr: maintainer threads initialization
...// scr: unix socket

/* create the listening socket, bind it, and init */
if (settings.socketpath == NULL) {
...// scr: not applicable
if (portnumber_filename != NULL) {
...// scr: not applicable
}

errno = 0; // scr: -----------------------------------> 5)
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}

errno = 0; // scr: -----------------------------------> 5)
if (settings.udpport && server_sockets(settings.udpport, udp_transport,
portnumber_file)) {
vperror("failed to listen on UDP port %d", settings.udpport);
exit(EX_OSERR);
}
if (portnumber_file) {
...// scr: not applicable
}

usleep(1000);
...
if (event_base_loop(main_base, 0) != 0) { // scr: --------> 6)
retval = EXIT_FAILURE;
}

...// scr: finalization
}

The two relevant steps are 4) and 5) which will be discussed in the following sections.

1) Raise the limit for core dump file size as well as the number of file descriptors.

2) Call event_init to initialize the libevent framework. The value returned is called an event base.

3) For all potential connections, call conn_init to allocate space to store their respective contexts (located using file descriptor in global variable conns). The role of context in event driven model has already been discussed in introduction.

4) Preallocate threads and their associated resources using memcached_thread_init.

5) Setup the socket and first event listener — conn_listening.

6) Call event_base_loop to start the event loop.

*) Other miscellaneous system operations, such as setting the signal handler for SIGINT and SIGTERM; setbuf stderr to NULL; dropping the root privileges of the process; and daemonizing. If those names do not ring a bell, <<Advanced UNIX Programming>> is your friend.

Threads initialization

The core data structure of multithreading mechanism is

typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
...// scr: stat
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
...// scr: cas & log
} LIBEVENT_THREAD;

memcached_thread_init

void memcached_thread_init(int nthreads, struct event_base *main_base) {
...// scr: initialize all sorts of mutexes and condition variables

threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); // scr 1)
if (! threads) {
perror("Can't allocate thread descriptors");
exit(1);
}

dispatcher_thread.base = main_base; // scr: --------------> 2)
dispatcher_thread.thread_id = pthread_self(); // scr: ----> 3)

for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) { // scr: -----------------------------> 4)
perror("Can't create notify pipe");
exit(1);
}

threads[i].notify_receive_fd = fds[0]; // scr: -------> 4)
threads[i].notify_send_fd = fds[1]; // scr: ----------> 4)

setup_thread(&threads[i]); // scr: -------------------> 5)
/* Reserve three fds for the libevent base, and two for the pipe */
stats_state.reserved_fds += 5;
}

/* Create threads after we've done all the libevent setup. */
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]); // scr: -> 6)
}

/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}

1) Allocate memory for an array of LIBEVENT_THREAD. The number of thread is num_threads Each element represents one thread. As described above, better the num_threads does not exceed the number of cores.

2) Set the event base for the dispatcher_thread which represents the main thread itself. Note that dispatcher_thread is a global variable so the reference is accessible to all the worker threads.

3) Set the thread id for dispatcher_thread.

4) Initialize the pipe fds for each of the worker thread. Here the notify_send_fd is used for communication between dispatcher thread and worker threads - whenever the dispatcher thread writes to notify_send_fd, an event is generated on the other side, notify_receive_fd, which is listened by worker threads. Again, <<Advanced UNIX Programming>> gives more information about pipe.

5) The full method name is supposed to be setup_libevent_for_each_thread. Will examine this method in the next section.

6) Call pthread_create to create the actual worker threads. Will examine this method in create_worker.

setup_thread

static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init(); // scr: -------------------------> 1)
if (! me->base) {
fprintf(stderr, "Can't allocate event base\n");
exit(1);
}

/* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd, // scr: 2)
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event); // scr: -------> 2)

if (event_add(&me->notify_event, 0) == -1) { // scr: -----> 2)
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}

me->new_conn_queue = malloc(sizeof(struct conn_queue)); // 3)
if (me->new_conn_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
cq_init(me->new_conn_queue);

...// scr: stat & cas
}

1) Call event_init to initialize the libevent instance for the worker thread. As discussed in thread model, each worker thread runs its own drive machine.

2) Set the thread_libevent_process as the callback of events emitted from the discussed notify_receive_fd. The major function of thread_libevent_process is to link the actual drive machine to events, which we will see very soon in inter-thread communication.

3) Allocate and initialize the connection queue of the worker thread.

create_worker

static void create_worker(void *(*func)(void *), void *arg) {
pthread_attr_t attr;
int ret;

pthread_attr_init(&attr);

if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}

As mentioned, this method calls pthread_create to create the actual worker threads. The callback passed through is worker_libevent which essentially starts the event loop using event_base_loop, this time, on worker threads rather than dispatch thread.

Socket initialization

The methods involved in socket initialization reconcile the initialization of both TCP and UDP while the following discussion covers only the TCP logic branch. And we consider portnumber_file is not set so as to focus on the critical path.

Unlike worker threads that listen to internal (pipe) fds, dispatch thread is responsible for events generated from external socket fds (by network requests). The method that initializes sockets is server_sockets.

If network interface is not indicated by inter, server_sockets is equivalent to

server_socket

static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
int sfd;
struct linger ling = {0, 0};
struct addrinfo *ai;
struct addrinfo *next;
struct addrinfo hints = { .ai_flags = AI_PASSIVE,
.ai_family = AF_UNSPEC };
char port_buf[NI_MAXSERV];
int error;
int success = 0;
int flags =1;

hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

if (port == -1) {
port = 0;
}
snprintf(port_buf, sizeof(port_buf), "%d", port);
error= getaddrinfo(interface, port_buf, &hints, &ai); //scr 1)
if (error != 0) {
...// scr: error handling
}

for (next= ai; next; next= next->ai_next) { // scr: ------> 2)
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) { // scr: --------> 3)
/* getaddrinfo can return "junk" addresses,
* we make sure at least one works before erroring.
*/
if (errno == EMFILE) {
/* ...unless we're out of fds */
perror("server_socket");
exit(EX_OSERR);
}
continue;
}
// scr: ------------------------------------------------------> 4)
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
if (IS_UDP(transport)) {
...// scr: not applicable
} else {
error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");

error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
if (error != 0)
perror("setsockopt");

error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");
}
// scr: ------------------------------------------------------> 5)
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
...// scr: error handling
} else {
success++;
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
...// scr: error handling
}
if (portnumber_file != NULL &&
...// scr: not applicable
}
}

if (IS_UDP(transport)) {
...// scr: not applicable
} else { // scr: -------------------------------------> 6)
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn; // scr: -----> 7)
listen_conn = listen_conn_add;
}
}

freeaddrinfo(ai);

/* Return zero iff we detected no errors in starting up connections */
return success == 0;
}

1) Get all the available network interfaces.

2) Iterate all the network interfaces and setup the sockets with the following steps.

3) new_socket encapsulates the operation of socket creation as well as that of setting it to non-block.

4) Tweak the newly created socket fd using setsockopt, in which

SO_REUSEADDR allows binding to a port in TIME_WAIT. This is useful for instantly rebooting a server on a “not fresh” TCP port;

SO_KEEPALIVE sends heartbeats to detect an absent client, and to release the resource for network connection in both kernel and user space; learn more

SO_LINGER enables fast close of a connection on RST; learn more

TCP_NODELAY disables nagle to improve latency. learn more

5) bind and start listening to the fd.

6) conn_new initializes the context for the fd and adds it to libevent with initial state set to conn_listening and callback as event_handler. Here event_handler is another transient method leading to the drive machine on dispatcher thread. Likewise, this method will be discussed soon in inter-thread communication.

conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c;
...// scr: initialize context conn
c->state = init_state;
...// scr: initialize context conn

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;

if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
...// scr: stat
return c;
}

7) Add the context to the head of a global list listen_conn.

Next we briefly go through the process that handles a new connection to wrap up the

Inter-thread communication

event_handler

Firstly, after a TCP connection completes, the fd monitored by dispatcher thread notifies libevent, which invokes the mentioned event_handler. Next, the logic flow enters the code snippet we got in the beginning - the drive machine, with context state initialized as conn_listening in socket initialization.

static void drive_machine(conn *c) {
...
while (!stop) {

switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
...
// scr: ------------------------------------> 1)
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
...
if (sfd == -1) {
...// scr: error handling
}
...

if (settings.maxconns_fast &&
stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) {
...// scr: error handling
} else {
// scr: --------------------------------------> 2)
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, c->transport);
}

stop = true;
break;
...
break;
}
}

return;
}

At this stage, the drive machine 1) accepts the connection and derives another fd that can be read from. It 2) then calls dispatch_conn_new with the new fd and other relevant information including the next state, conn_new_cmd.

dispatch_conn_new

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
char buf[1];
if (item == NULL) {
...// scr: error handling
}

int tid = (last_thread + 1) % settings.num_threads; // scr: 1)

LIBEVENT_THREAD *thread = threads + tid; // scr: ---------> 1)

last_thread = tid; // scr: -------------------------------> 1)

item->sfd = sfd; // scr: ---------------------------------> 2)
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;

cq_push(thread->new_conn_queue, item); // scr: -----------> 3)

MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) { // scr: > 4)
perror("Writing to thread notify pipe");
}
}

1) Round robin the threads established in threads initialization.

2) Initializes a CQ_ITEM instance. Here CQ_ITEM is an intermediate object passed to worker threads through connection queue, so worker threads can create new context based on it.

typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd;
enum conn_states init_state;
int event_flags;
int read_buffer_size;
enum network_transport transport;
CQ_ITEM *next;
};

3) Push CQ_ITEM to the connection queue.

4) Write to notify_send_fd with the command 'c'.

As discussed before, 4) generates an event on the other side of the pipe (on the chosen worker thread), which invokes

thread_libevent_process

static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
unsigned int timeout_fd;

if (read(fd, buf, 1) != 1) { // scr: ---------------------> 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
return;
}

switch (buf[0]) {
case 'c':
item = cq_pop(me->new_conn_queue); // scr: -----------> 2)

if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base); // scr: ------------> 3)
if (c == NULL) {
...// scr: error handling
} else {
c->thread = me; // scr: ----------------------> 4)
}
cqi_free(item);
}
break;
/* we were told to pause and report in */
...// scr: not applicable
break;
}
}

1) Read the command (i.e., 'c') from the pipe.

2) Read the CQ_ITEM from the connection queue.

3) Call conn_new. In server_socket we know that conn_new establishes the context, this time, for the new connection, and adds the accepted fd to libevent. Here on worker thread, the callback is set to event_handler, which essentially connects the drive machine to the upcoming events on the same connection.

4) Set the thread information to the context.

Reference

W. Richard Stevens. 1992. Advanced Programming in the UNIX Environment. Addison Wesley Longman Publishing Co., Inc., Redwood City, CA, USA.

Resetting a TCP connection and SO_LINGER

Single-process event-driven


Originally published at holmeshe.me.