现在的位置: 首页 > 综合 > 正文

InnoDB存储引擎行锁的实现

2019年05月21日 ⁄ 综合 ⁄ 共 3867字 ⁄ 字号 评论关闭

InnoDB与MyISAM不同,它实现的是一个行级锁,而非MyISAM的表锁。锁的粒度越大,则发生死锁的概率越小、锁机制开销越小,但并发能力会越低。如果锁的粒度变细,则发生死锁的概率也会增大,锁机制的开销会更大,但是并发能力能提高。表锁是如何实现的呢,以MyISAM为例,是在每个表的结构中加入一个互斥变量记录锁状态,像:

struct Table {

Row rows[MAXROWS];

pthread_mutex_t lock;//表锁

};

这样做的好处就是锁非常简单,当操作表的时候,直接锁住整个表就行,锁机制的开销非常小。但是问题也很明显,并发量上不去,因为无论多小的操作,都必须锁整个表,这可能带来其他操作的阻塞。

行锁又是如何实现的呢,Oracle是直接在每个行的block上做标记,而InnoDB则是靠索引来做。InnoDB的主键索引跟一般的索引不太一样,Key后面还跟上了整行的数据,互斥变量也是加载主键索引上的,像

struct PK_Idx {

Row row;

pthread_mutex_t lock;//行锁

};

multimap pk_idx;

这样的形式。

这样做的好处是锁的粒度小,只锁住需要的数据不被更改,但是问题也很明显,锁的开销很大,每个主键索引上都要加上一个标记,因为锁的粒度很小,可能两个不同的操作各锁住一部分行等待对方释放形成死锁,不过这个是有办法解决的,把上锁的操作封装成原子操作就行,不过并发量会受些影响。

下面是类似InnoDB的Next-Key locking算法的演示:

编译需要加-lpthread参数,例如g++ inno.cpp -lpthread -o inno

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <string>
#include <map>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
#include <windows.h>
 
#define LOCK pthread_mutex_lock(&lock)
#define UNLOCK pthread_mutex_unlock(&lock)
#define PRINT(STR, ...) LOCK;fprintf(stderr, STR,  __VA_ARGS__);UNLOCK
 
#define MAXROWS 100
 
using namespace std;
 
/* 行结构 */
struct Row {
    int     num;
    string  info;
};
 
/* 主键索引结构 */
struct PK_Idx {
    Row     row;
    pthread_mutex_t lock;//行锁
};
 
/* 表结构 */
struct Table {
    multimap<int, PK_Idx> pk_idx;
    multimap<int, int>      num_idx;
    multimap<string, int>   info_idx;
    Row     rows[MAXROWS];
    pthread_mutex_t lock;//表锁
};
 
Table table;
int pid;
//全局锁
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 
/* 随机字符 */
char randChar() {
    return rand()%26+'A';
}
 
/* 随机字符串 */
void randString(string &col, int len) {
    col = "";
    for(int i=0; i<len; ++i) {
        col += randChar();
    }
}
 
/* 初始化数据 */
void init() {
    pid = 0;
    PK_Idx pk;
 
    srand((unsigned)time(0));
 
    //初始化表数据
    for(int i=0; i<MAXROWS; ++i) {
        pk.row.num = rand()%MAXROWS;
        randString(pk.row.info, rand()%10+1);
        //初始化行锁
        pk.lock = PTHREAD_MUTEX_INITIALIZER;
        //写入表数据
        table.rows[i].num = pk.row.num;
        table.rows[i].info = pk.row.info;
        //写入索引
        table.pk_idx.insert(pair<int, PK_Idx>(i, pk));
        table.num_idx.insert(pair<int, int>(pk.row.num, i));
        table.info_idx.insert(pair<string, int>(pk.row.info, i));
    }
    //初始化表锁
    table.lock = PTHREAD_MUTEX_INITIALIZER;
}
 
/*获取范围数据*/
void select_num(int begin, int end) {
    int id;
    int cur_pid;
    multimap<int,int>::iterator it, itlow, itup;
    PK_Idx *pk;
    /* 按字段范围查找ID */
    itlow = table.num_idx.lower_bound (begin);
    itup = table.num_idx.upper_bound (end);
 
    LOCK;
    cur_pid = pid++;
    UNLOCK;
    PRINT("%d : * Start Select:%d,%d *\n", cur_pid, begin, end);
    for (it=itlow; it!=itup; ++it) {
        id = it->second;
        pk = &(table.pk_idx.find(id)->second);//根据ID去查主键索引
        pthread_mutex_lock(&(pk->lock));//在主键索引上加锁
        PRINT("%d : LOCK Row %d: %d\t%s\n", cur_pid, id, pk->row.num, pk->row.info.c_str());
        Sleep(500);
    }
    for (it=itlow; it!=itup; ++it) {
        id = it->second;
        pk = &(table.pk_idx.find(id)->second);
        PRINT("%d : UNLOCK Row %d\n", cur_pid, id);
        pthread_mutex_unlock(&(pk->lock));//使用完毕依次释放锁
    }
    PRINT("%d : * Select Finished! *\n", cur_pid);
}
 
/*修改范围数据*/
void update_num(int begin, int end) {
    int id;
    int cur_pid;
    multimap<int,int>::iterator it, itlow, itup;
    PK_Idx *pk;
 
    itlow = table.num_idx.lower_bound (begin);
    itup = table.num_idx.upper_bound (end);
 
    LOCK;
    cur_pid = pid++;
    UNLOCK;
    PRINT("%d : * Start Update:%d,%d *\n", cur_pid, begin, end);
    for (it=itlow; it!=itup; ++it) {
        id = it->second;
        pk = &(table.pk_idx.find(id)->second);
        pthread_mutex_lock(&(pk->lock));
        PRINT("%d : LOCK Row %d: %d\t%s\n", cur_pid, id, pk->row.num, pk->row.info.c_str());
        Sleep(500);
    }
    for (it=itlow; it!=itup; ++it) {
        id = it->second;
        pk = &(table.pk_idx.find(id)->second);
        PRINT("%d : UNLOCK Row %d\n", cur_pid, id);
        pthread_mutex_unlock(&(pk->lock));
    }
    PRINT("%d : * Update Finished! *\n", cur_pid);
}
 
 
void* test_select(void *) {
    int begin, end;
    srand((unsigned)time(0));
    while(1) {
        begin = rand()%(MAXROWS/2);
        end = begin+rand()%(MAXROWS/2);
        select_num(begin, end);
        Sleep(500);
    }
}
 
void* test_update(void *) {
    int begin, end;
    srand((unsigned)time(0));
    while(1) {
        begin = rand()%(MAXROWS/5);
        end = begin+rand()%(MAXROWS/5);
        update_num(begin, end);
        Sleep(500);
    }
}
 
void test() {
    pthread_t id[2];
    if(pthread_create(&id[0], NULL, test_select, NULL) != 0)
    {
        PRINT("%s", "Create Thread Error!\n");
    }
 
    if(pthread_create(&id[1], NULL, test_update, NULL) != 0)
    {
        PRINT("%s", "Create Thread Error!\n");
    }
 
    while(1){
		Sleep(500);
	}
}
 
int main() {
    init();
    test();
    return 0;
}

抱歉!评论已关闭.