天天看点

Redis(三)跳跃表介绍及源码一.理想跳跃表二.近似理想跳跃表三.Redis中的跳跃表四.跳跃表部分源码

一.理想跳跃表

       查询链表的时间复杂度O(n),即使该链表的是有序的,但若我们在链表上在加一层链,且每次跳过一个节点(即进行一次二分),如下图所示:

Redis(三)跳跃表介绍及源码一.理想跳跃表二.近似理想跳跃表三.Redis中的跳跃表四.跳跃表部分源码

如此一来,从L2链开始搜索,要查询8只需4次即可,而查询7时,在搜锁到节点6后可知下一个节点8大于7因此必在节点6和节点8之间,从而进入L1链继续查询,总共需4次即可。

    若在L2链的基础上增加一层链,在每次跳过L2上的一个节点(即在L2链上进一步二分),那么我便可以进一步增加搜索速度,如下图所示:

Redis(三)跳跃表介绍及源码一.理想跳跃表二.近似理想跳跃表三.Redis中的跳跃表四.跳跃表部分源码

此时查询8只需在L3上搜索2次,查询7需要3次(从L3搜索到节点4,转至L2搜索至节点6,转至L3搜索至节点7)。

    最后再在L3基础上增加一层链,即再进行一次二分,如下图所示:

Redis(三)跳跃表介绍及源码一.理想跳跃表二.近似理想跳跃表三.Redis中的跳跃表四.跳跃表部分源码

       这便是理想跳跃表,在链表的基础上不断进行二分,共logn层(n为节点个数),查询的时间为2logn,因为每一层至多进行2次比较。由此可知为什么说跳跃表的性能可以和平衡树相媲美。

       可是这样一来为了维护一个理想跳跃表,删除和添加时都要调整各层链表,而这十分耗时。因此大多跳跃表都利用概率论实现近似理想跳跃表。

二.近似理想跳跃表

       由第一节可知每一层链都是在前一次链的基础上进行的二分,每两个节点链入一个,即链入1/2的节点,而这可以通过概率来近似,即每个链入Li层的节点有1/2的概率链入L(i + 1)层。从而可知一个节点链入L1的概率为1,链入L2的概率为1/2,链入L3的概率为1/4,一次类推1/8,1/16,1/32....。

       在程序中如何实现呢?可以采用随机数的方式,随机产生[ 1 ~ 2^MaxLevel ]的数字,那么L1概率对应的范围为1~2^MaxLevel,即概率为1;L2概率对应的范围为2^(MaxLevel  - 1) ~ 2^MaxLevel,即概率为1/2;L3层对应的范围为2(MaxLevel  - 2) ~ 2^(MaxLevel  - 1),即概率为1/4,依次类推,如下图所示。当数据量较少时不一定十分近似于理想跳跃表,但当数据量较大时根据概率论可知会近似于理想跳跃表。

Redis(三)跳跃表介绍及源码一.理想跳跃表二.近似理想跳跃表三.Redis中的跳跃表四.跳跃表部分源码

三.Redis中的跳跃表

1.跳跃表结构

     Redis中跳跃表的结构如下所示:

// 跳跃表节点
typedef struct zskiplistNode {
    // 后退指针,指向紧邻的前一个节点
    struct zskiplistNode* backNode;

    // 分值,根据该值将节点有小到达排序,多个节点可以有相同的分值
    double score;

    // 指向保存的成员对象的指针,指向的是一个字符串对象SDS
    robj *obj; 

    // 层,即前两节所述的链表层L1,L2...
    struct zskiplistLevel{
        // 某一层链表的前进指针
        struct zskiplistNode *forward;

        // 跨度,即到下一个节点跨越了几个节点
        unsigned int span;
    } level[];

}zskiplistNode;

typedef struct zskiplist {
    // 分别指向头节点和尾节点
    struct zskiplistNode *header, *tail;

    // 跳跃表长度
    unsigned long length;

    // 跳跃表层数
    int level;
} zskiplist;
           

      当创建一个跳跃表节点时,程序都要根据幂次定律(越大的数出现的概率越小)随机生成一个介于1~32之间的值作为level数组的大小。

【注1】:以上这句是书中给出的,其实就是根据第二节中的方式取出一个随机数,从而决定level的大小,也就是要链入前几层链表中。比如,若产生的随机数在2^(MaxLevel  - 1)~2^MaxLevel 之间则level大小为2,需链入L1,L2;若产生的随机数在2^(MaxLevel  - 2) ~ 2^(MaxLevel  - 1)之间则level大小为3,需要链入L1,L2,L3。

四.跳跃表部分源码

1.跳跃表的初始化与析构

// 创建跳跃表节点
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); // 根据跳跃表层数,开辟跳跃表节点空间
    zn->score = score; // 设置节点分值
    zn->obj = obj; // 设置节点成员对象(robj为 redisObject),用于保存一个redis对象
    return zn;
}

// 创建跳跃表
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1; // 跳跃表层数(即链表层数)初始化为1
    zsl->length = 0; // 跳跃表长度
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { // 初始化前向指针和跨度
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

// 释放跳跃表节点
void zslFreeNode(zskiplistNode *node) {
    decrRefCount(node->obj); // 若引用计数为1,则释放该redis对象,否则减少引用计数
    zfree(node); // 释放跳跃表节点空间
}

// 释放跳跃表
void zslFree(zskiplist *zsl) {
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header); // 释放首节点空间

    // 根据引用计数决定是否释放每个节点中的redis成员
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node); 
        node = next;
    }
    zfree(zsl);
}
           

2.一个新的节点链入哪些层中的概率算法

int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) // 链入下一层的概率是前一层的1/4
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
           

3.跳跃表节点的插入操作

     如何向跳跃表中插入一个新的节点

// 向跳跃表中添加一个新的节点,分值为score,存储的redis对象为obj
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        // rank[i]记录了如链入第i层,则需要链入的位置(跨度)
        // rang[i] =ramk[i+1]是由于直接从该节点进入下一次继续查找
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        // update[i]记录了链入第i层的哪个节点之后
        update[i] = x;
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */
    level = zslRandomLevel();  // 根据概率计算应该链入跳跃表的哪几层
    if (level > zsl->level) {  // 若要增加使用层数,则初始化增加层
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj); // x指向新增节点
    for (i = 0; i < level; i++) { // 将x节点链入要加入的层
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    // 对于更高层(为链入的层)要更新部分节点的跨度
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // 设置后向指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
           

【问】:为什么不先计算链入哪些层,再计算再这些层中的链入位置?

【答】:因为更高层也需要计算跨度

4.跳跃表的节点删除

   在跳跃表中删除某节点

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
// 删除x指向的节点
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;  // 计算新的跨度
            update[i]->level[i].forward = x->level[i].forward; // 在i层断开x节点
        } else {
            update[i]->level[i].span -= 1; // 减少跨度
        }
    }
    if (x->level[0].forward) { // 重置后退指针
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 查找要删除节点在每一层中的位置,并将其在每一层中的前向节点保存在update中
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }

    x = x->level[0].forward; // 此时x指向要删除的节点(原先指向前一个节点),0层的跨度皆为1
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    }
    return 0; /* not found */
}
           

继续阅读