理解 Memcached 源码 — LRU II
多半情况下,LRU 会和 哈希表 一起使用,然后我们把这个组合称为 LRU-cache。
在 LRU-cache 中, 哈希表 提供快速索引的功能; 而 LRU 则是将 最近最少使用 (least recently used) 的元素在适当的时机标记为过期,并予以删除,用来防止缓存无限增大。本篇我们主要看 哈希表 的实现。
概述 (课本都讲过了, 跳过)
哈希表 本质上是一块 固定大小 的数组。数组的索引是由键被哈希生成的整形。在 哈希表 中,这个数组的元素也被称作 桶,以区别于 哈希表 本身的元素,item。当然,如果生成的索引超过 桶 的数量,则取模之。如果两个 键值 被 哈希 成同一个整形索引值,或者两个索引值被取模后的值刚好相同,则 冲突 发生。 所以一般做法是在每个 桶 上建立一个链表来存储所有 冲突 于此的键值对。
需要提一下,冲突 会降低 哈希表 的查找速度,原因很简单,因为要遍历链表 O(n)。处理方法也很简单,把数组大小扩容,然后重新对所有的元素进行 重新哈希,因为 桶 的数量增加了,冲突 也就减少了,而由 冲突 所形成的链表也就变少或变短了。 当然,重新哈希 的代价很大,所以一般是在性能实际下降时严重时才会发起。具体过程很快会就会讲到。
模块初始化
第一个方法
hash_init
用来确定 哈希算法
int hash_init(enum hashfunc_type type) {
switch(type) {
case JENKINS_HASH:
hash = jenkins_hash;
settings.hash_algorithm = "jenkins";
break;
case MURMUR3_HASH:
hash = MurmurHash3_x86_32;
settings.hash_algorithm = "murmur3";
break;
default:
return -1;
}
return 0;
}hash_init@hash.c
这个方法在 初始化过程中 被调用。
其中参数 hash_type
被设置为 MURMUR3_HASH
。这个是由命令行参数 modern 决定的。
第二个方法
assoc_init
为上述固定大小数组分配内存。
void assoc_init(const int hashtable_init) {
if (hashtable_init) {
hashpower = hashtable_init;
}
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
...// scr: stat
}hash_init@hash.c
本方法也是在初始化过程中被调用
而初始大小则是由命令行参数hashpower确定。
如上所述,如果 冲突 过多而导致查询性能下降,我们需要用更大的数组来替换当前数组。接下来我们看一下这个过程
扩容和迁移
在 memcached, 这个扩容的阈值是 1.5, 即, 当 item(或者是 哈希表 元素) 的数量超过 桶 数量的 1.5 倍,上述机制会被激活。
Expand hash map
...
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
...assoc_insert@assoc.c:173
这里, assoc_start_expand
仅仅是设置了一个标志位 (i.e., do_run_maintenance_thread
), 然后给辅助维护线程发信号并唤起之。然后由这个线程进行真正的扩容迁移工作。
static void assoc_start_expand(void) {
if (started_expanding)
return;
started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}assoc_insert@assoc.c:173
辅助维护线程主循环
static void *assoc_maintenance_thread(void *arg) {
mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread/* scr: the flag*/) {
int ii = 0;
/* There is only one expansion thread, so no need to global lock. */
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { // scr: ----> 2)
item *it, *next;
int bucket;
void *item_lock = NULL;
/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
if ((item_lock = item_trylock(expand_bucket))) { // scr: --------> 3)
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next; // scr: ----------------------------------> 4)
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
old_hashtable[expand_bucket] = NULL; // scr: ----------------> 4.1)
expand_bucket++; // scr: --------------------------------------> 5)
if (expand_bucket == hashsize(hashpower - 1)) { // scr: -------> 6)
expanding = false;
free(old_hashtable);
... // scr: ---------------------------------------------------> stat & log
}
} else {
usleep(10*1000); // scr: ------------------------------------> 3.1)
}
if (item_lock) { // scr: --------------------------------------> 3.2)
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &maintenance_lock); // scr: > 0)
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand(); // scr: -----------------------------------------> 1)
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}
assoc_maintenance_thread@assoc.c
0) 从这里开始,线程被唤起并开始工作;而当整个工作完成后,或者根本就没有迁移的需要时,此线程也是在这里被挂起。
1) assoc_expand 为新的 哈希表 分配内存。这个新的 哈希表 后面会完全替换由初始化阶段建立的既有 哈希表。
2) 一个批次仅迁移固定数量 (hash_bulk_move)的 items。这样做的好处是在stop_assoc_maintenance_thread被调用时,辅助维护线程可以在完成当前批次立即停止,而不是要等到全量迁移完成。
3) (这里 “item lock” 实际上作用于 桶 上,所以bucket lock应该是个更合适的名字) 用 item_trylock (i.e., pthread_mutex_trylock
) 来访问 桶; 3.1) 如果 桶 不可用,线程休眠10秒; 3.2) 当 桶 迁移完成时释放锁 (item_trylock_unlock)。
4) 对 bucket 上的所有 item 进行 重新哈希,并迁移至新 哈希表。
5) 继续对下一个 桶 进行迁移。
6) 所有的 桶 都迁移完成 -> 回到 0)。
辅助迁移线程启动
int start_assoc_maintenance_thread() {
int ret;
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
return -1;
}
return 0;
}start_assoc_maintenance_thread@assoc.c
辅助维护线程终止
stop_assoc_maintenance_thread
是在系统终止时被调用的。语义上来看本方法对应的是 start_assoc_maintenance_thread
,而功能上看,则和 assoc_start_expand
里的操作正好相反。
void stop_assoc_maintenance_thread() {
mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
mutex_unlock(&maintenance_lock);
/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
}stop_assoc_maintenance_thread@assoc.c
如上述,扩容和迁移的状态对所有的 哈希表 相关操作都有影响。下面我们具体看看有哪些影响。
CRUD
assoc_insert
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket]; // scr: -------------------> 1)
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)]; // scr: > 2)
primary_hashtable[hv & hashmask(hashpower)] = it;
}
pthread_mutex_lock(&hash_items_counter_lock);
hash_items++; // scr: ------------------------------------------------> 3)
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
pthread_mutex_unlock(&hash_items_counter_lock);
MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}assoc_insert@assoc.c
1) 如果正在扩容中,并且指定键对应的 桶 还没有被迁移,则将 item 插入至 old_hashtable
。这里注意我们是用老的 桶 数(i.e., hashmask(hashpower - 1))
)来计算下标。
2) 如果不满足以上情况, 将 item 插入 primary_hashtable
。
3) 增加全局计数 hash_items (即item 数)。 如果 hash_items
超过了阈值, 则启动上文描述的扩容过程。
assoc_find
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket]; // scr: ---------------------------> 1)
} else {
it = primary_hashtable[hv & hashmask(hashpower)]; // scr: --------> 2)
}
item *ret = NULL;
int depth = 0;
while (it) { // scr: -------------------------------------------------> 3)
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}assoc_find@assoc.c
1) 和assoc_insert类似,这步用于定位尚未被重新哈希的键。并从 old_hashtable
取出对应的 桶。
2) 同样,如果没有进行扩容或者已经完成迁移,则从 primary_hashtable
中取。
3) 遍历链表并找到键对应的 item。这个也是标准操作了。
assoc_delete
基于上篇的讲解,当考虑到扩容迁移时,这里唯一需要重新讨论的函数是 _hashitem_before
。和assoc_find类似,这里的变化仅仅是,当对应的键尚未完成迁移时从old_hashtable
读取。
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
item **pos;
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
pos = &old_hashtable[oldbucket]; // scr: ---------------------------> diff)
} else {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}
while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}_hashitem_before@assoc.c