Understanding The Memcached Source Code — LRU II

Photo by Brina Blum on Unsplash
Slab allocator is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely, LRU algorithm for entry expiration; and an event driven model based on libevent; and the consistent harsh for data distribution, are built around it.

Slab I
Slab II
Slab III
LRU I
LRU II (this article)
LRU III
Event driven I

More often than not, the LRU algorithm is combined with a hash map, and is referred to as a LRU cache.

In a LRU-cache, the hash map enables fast accessing of cached objects; and LRU avoids the cache to grow infinitely by marking expired, or so called, least recently used objects. This time we examine the memcached‘s implementation of hash map.

Overview (textbook overlapped, skip)

Hash map is basically a fixed-sized array that indexes values with integers hashed from keys. In hash map an array entry is referred to as a bucket. If the hash value exceeds the number of buckets (i.e., array size), it rolls over using ‘mod’ (%). Collision occurs when more than two keys result in the same hash value or different hash values roll over to the same bucket, then a *linked list is formed on the bucket in collision.

Collision slows down lookups speed for the sequential access of linked list, hence it is required to increase the bucket number, and to rehash entries using the new bucket number before the performance goes too bad. This process will be discussed soon.

Module initialization

The first relevant method is

hash_init

which simply determines the hash algorithm type.

int hash_init(enum hashfunc_type type) {
switch(type) {
case JENKINS_HASH:
hash = jenkins_hash;
settings.hash_algorithm = "jenkins";
break;
case MURMUR3_HASH:
hash = MurmurHash3_x86_32;
settings.hash_algorithm = "murmur3";
break;
default:
return -1;
}
return 0;
}

This method is called from here as one of the init steps before the logic enters the main event loop.

The parameter hash_type is set to MURMUR3_HASH by the mentioned command-line argument modern.

The second method

assoc_init

allocates the fixed sized array mentioned in the beginning.

void assoc_init(const int hashtable_init) {
if (hashtable_init) {
hashpower = hashtable_init;
}
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
...// scr: stat
}

This method is called in a similar location as hash_init as part of the system bootstrap process.


assoc_init(settings.hashpower_init);

And the actual initial size is determined by the command-line argument hashpower.

...
case HASHPOWER_INIT:
if (subopts_value == NULL) {
fprintf(stderr, "Missing numeric argument for hashpower\n");
return 1;
}
settings.hashpower_init = atoi(subopts_value);
if (settings.hashpower_init < 12) {
fprintf(stderr, "Initial hashtable multiplier of %d is too low\n",
settings.hashpower_init);
return 1;
} else if (settings.hashpower_init > 64) {
fprintf(stderr, "Initial hashtable multiplier of %d is too high\n"
"Choose a value based on \"STAT hash_power_level\" from a running instance\n",
settings.hashpower_init);
return 1;
}
break;
...

As said before, the array can be replaced with a newly allocated larger one if the performance drops due to excessive collision. Next we discuss the process of

Scale up & entry migration

In memcached, the threshold is 1.5, meaning, if the items number exceeds 1.5 * bucket number, the mentioned expanding process starts.

...
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
...

The assoc_start_expand simply set a flag (i.e., do_run_maintenance_thread), and send a signal to awake a maintenance thread that does the actual job.

static void assoc_start_expand(void) {
if (started_expanding)
return;
started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}

Maintenance thread main loop

static void *assoc_maintenance_thread(void *arg) {
mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread/* scr: the flag*/) {
int ii = 0;
/* There is only one expansion thread, so no need to global lock. */
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { // 2)
item *it, *next;
int bucket;
void *item_lock = NULL;
/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
if ((item_lock = item_trylock(expand_bucket))) { // scr: 3)
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next; // scr: ------------------------> 4)
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
old_hashtable[expand_bucket] = NULL; // scr: --------------> 4.1)
expand_bucket++; // scr: ------------------------------------> 5)
if (expand_bucket == hashsize(hashpower - 1)) { // 6)
expanding = false;
free(old_hashtable);
... // scr: -------------------------------------------> stat & log
}
} else {
usleep(10*1000); // scr: --------------------------> 3.1)
}
if (item_lock) { // scr: ----------------------------------> 3.2)
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &maintenance_lock); // scr: --------------------------------------------------------> 0)
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand(); // scr: -------------------------------> 1)
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}
{% endcodeblock %}

0) This is where the thread waits up from sleep and start working, and goes to sleep when there is nothing to be done.

1) assoc_expand allocates the resource for the new hash map which is meant to replace the old one initialized from here.

/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
old_hashtable = primary_hashtable;
primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
... // scr: log
hashpower++;
expanding = true;
expand_bucket = 0;
... // scr: stat
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}

2) Only migrate a certain number of items in one batch. hash_bulk_move avoids the thread hanging around too long when stop_assoc_maintenance_thread is called. In contrast to the discussed assoc_start_expand, stop_assoc_maintenance_thread reset the flag do_run_maintenance_thread and send the signal to wake up the thread to exit.

void stop_assoc_maintenance_thread() {
mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
mutex_unlock(&maintenance_lock);
/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
}

3) (The “item lock” actually works on the whole bucket hence I will call it bucket lock instead) Use low priority item_trylock (i.e., pthread_mutex_trylock) to access the bucket lock; 3.1) sleep for 10 sec when the the item is not available; and 3.2) release the lock using item_trylock_unlock when the migration (of this bucket) completes.

void *item_trylock(uint32_t hv) {
pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
if (pthread_mutex_trylock(lock) == 0) {
return lock;
}
return NULL;
}

4) Rehash all the items in the bucket, and migrate them to the new hash map.

5) Move on to the next bucket.

6) Last bucket reached -> go to 0)

Maintenance thread start

int start_assoc_maintenance_thread() {
int ret;
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
return -1;
}
return 0;
}

Similar to initialization methods, it is called during system bootstrap.

Maintenance thread stop

This method is called in system shutdown process, hence it is opposite in logic to start_assoc_maintenance_thread. Nevertheless, the operations of this method are opposite that of assoc_start_expand mechanism wise.

void stop_assoc_maintenance_thread() {
mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
mutex_unlock(&maintenance_lock);
/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
}

As said before, the expanding & migration process discussed here has an impact on the logic of all hash map related operations. In the next section, we look at these operations.

CRUD

N.b., assoc_delete has been discussed in the previous post; and in a key-value system update and insert are essentially the same, thus, this section will discuss the operations of C (create) and R (read) only.

assoc_insert

int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket]; // scr: -------> 1)
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)]; // scr: --------------------------------------------------------------> 2)
primary_hashtable[hv & hashmask(hashpower)] = it;
}
pthread_mutex_lock(&hash_items_counter_lock);
hash_items++; // scr: --------------------------------------> 3)
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
pthread_mutex_unlock(&hash_items_counter_lock);
    return 1;
}

1) If expanding process is undergoing and the hash key associated bucket has not been migrated, insert the item to old_hashtable. Note that here we use the old bucket number (i.e., hashmask(hashpower - 1))) to calculate the hash index.

2) Otherwise, insert the item to primary_hashtable directly.

3) Increase the global variable hash_items (number of items). If it exceeds the threshold after the item is added, start expanding & migration process. Note that this is also the preamble of the previous section.

assoc_find

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket]; // scr: ---------------> 1)
} else {
it = primary_hashtable[hv & hashmask(hashpower)]; // 2)
}
item *ret = NULL;
int depth = 0;
while (it) { // scr: -------------------------------------> 3)
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}

1) Similar to that of assoc_insert, this step locates the bucket from old_hashtable when the key is yet to be rehashed.

2) Use primary_hashtable directly otherwise.

3) Go through the linked list and compare the key (instead of the hash index) directly to lookup the item in the case of Collision.

One thing worth noting is that assoc_find is very similar to _hashitem_before which has been discussed in the previous post. The difference here is, _hashitem_before returns the address of the next member of the element before the found one (pos = &(*pos)->h_next;), which is required when removing entries from a singly linked list; whilst this method returns the element found directly (ret = it;).


Originally published at holmeshe.me.