序言
期待读者
本文期待读者有C语言编程基础,后文中要分析代码,
如何阅读代码
就以rt-thread内核代码为例(注,指rt-
- MDK/IAR/其他集成开发环境,最好支持软件仿真,
我使用MDK4.0 - 强大的代码阅读软件source insight
- 一个笔记本,随时用来记录自己的一些想法,感悟,或者困惑
首先使用source insight创建一个rt-thread的工程,
一切就绪有以后,面对浩如烟海的代码(8500行),
rt-thread的内核调度算法
rt-thread的内核调度算法采用位图(bitmap)
rt-
《rt-thread编程指南》中已经大致介绍了,rt-
当系统存在多个线程时,可能的情况是,
在上面的情形中,摆在rt-thread面前的问题就,
线程结构存储
实际上,寻找当前线程优先级最高的线程并调度执行,
现在让我们做几点说明:
- 每一个线程的信息用线程控制块来表示,线程控制块,即Thread Control-Block,缩写为TCB,它是在rtdef.
h中定义一个struct结构体, 这个结构体的作用就是用来描述一个线程所有必要信息。 - 线程的优先级别用非负整数(即无符号整数)表示,
并且优先级越高,对应的数越小 - 系统的线程优先级的数目固定,最多支持256级
- 系统中的线程数目不做任何限制,
线程的数目仅受限于系统RAM的大小。 重点来考虑最后两点,我们来思考一下,当系统存在多个的线程时, 也就是说会有多个TCB时,我们怎么来“排列” 或者存储这些TCB,才能实现上面的这两点要求?
线程的优先级别数目固定,显然我们可以使用一个数组来定义,
线程数目不受限制,那当某个线程优先级上存在多个线程时,
这样我们就可以达到上面提及的两点设计要求,
scheduler.c 中
... (1) rt_list_t rt_thread_priority_table[RT_THREAD_PRIORITY_MAX]; (2) struct rt_thread *rt_current_thread; (3) rt_uint8_t rt_current_priority; #if RT_THREAD_PRIORITY_MAX > 32 /* maximun priority level, 256 */ (4) rt_uint32_t rt_thread_ready_priority_group; (5) rt_uint8_t rt_thread_ready_table[32]; #else /* maximun priority level, 32 */ (6)rt_uint32_t rt_thread_ready_priority_group; #endif ...这里我们假定RT_THREAD_PRIORITY_
MAX这个宏为256,即条件编译语句会编译#if和# else之间的语句。
- 语句(1)即定义了线程TCB数组。
该数组的每一个元素是一个rt_list_t类型的元素, 实际上这就是一个链表的数据结构。 - 语句(2)中定义了一个指针,从名称上来看,即当前线程,
struct rt_thread就是线程TCB数据结构类型。 - 语句(3)定了当前的优先级。
- 语句(4)当前的ready优先级组。
- 语句(5)定了一个数组,语句(6)定义了一个u32的变量,
它们是做什么用的呢?我们稍后分析。
rt-thread中的线程数据结构的存储问题已经解决,
位图调度算法
调度算法首先要找出所有线程优先级中优先级最高的那个线程优先级
调度算法1
for(i=0; i<256; i++) { if(rt_thread_priority_table[i] != NULL) break; } highest_ready_priority = i;上面这种做法是可以正确调度最高优先级的线程,
但是它有一个问题, 如果当前系统中具有最高优先级的线程对应的优先级的数字( 根据上面的分析,数字越大, 线程TCB越存储在TCB数组的后面,其优先级别越低) 如果为0级,显然我们一次就可以找出,如果很不幸, 这个从0级到254级上都没有就绪的线程, 仅在255级上有就绪的线程, 我们却不得不在检查了数组这256个元素之后, 才能找出可以运行的线程。 因此,我们要寻找一种具有恒定执行时间的调度算法 。
首先来考虑,每一个优先级上是否存在线程,这是一个是/否问题,
即要么存在线程,要么不存在线程,这可以用一个bit位来表示。 我们规定这个bit为1表示存在线程,为0表示不存在线程。 对于256级的线程,则共需要256个bit位。理想的情况是,
我们创建一个具有256个bit的变量, 然后操作系统使用这个变量来维护整个系统所有对应优先级上是否存 在活动的线程。显然,C语言不支持:-(, 但是256个bit也就是32个字节, 我们定义一个32字节长的数组即可, 然后将这个数组看成一个整体。 现在需要约定,这32个字节即256个bit,
和256个线程优先级的对应关系。一个字节的最高位为bit7, 最低位为bit0,和上面的说明一致的是, 我们用bit0表示更高的优先级,用BIT7表示稍低的优先级。 来考虑这32个字节中的第一个字节。
第一个字节的bit0用来表示优先级0,bit7表示优先级7。 第二个字节bit0表示优先级8,bit7表示优先级15。 其他依次类推。可以参考的如下表格, 它描述了这32个字节的各个bit是和系统的256个优先级的对 应关系。 单元格中的内容表示对应的优先级。 每一行为对应的一个字节,每一列为各个bit位。
bit7 6 5 4 3 2 1 0 byte0 |007|006|005|004|003|002|001|000| byte1 |0l5|014|013|012|011|010|009|008| ................................. byte32|255|254|253|252|251|250|249|248|上面这32个字节所组成的256个bit,
他们的排列方式很像一张图(map), 所以这种方式就别称为位图(bit map)。这张图就是前面scheduler. c中定义的32个字节的数组。如下
(5) rt_uint8_t rt_thread_ready_table[32];举个例子,我们创建了一个线程,并且指定了它的优先级是125,
然后将它设置为就绪(READY), 实际上在我们在调用函数将它变为READY的函数中, RTT就会去上面这256个bit中(也即是这32个字节), 找到第125个bit,我称之为位图的BIT125, 也即是字节15 (125/ 8 = 15,125%8 = 5)的第5个bit,将这个bit置1。 即位图的BIT125
就是rt_thread_ready_table[125/8]的BIT5.我们可以用位代码表示为 BITMPA.BIT_125 = rt_thread_ready_table[125/8]. BIT5 优先级125 对应那个字节的哪个bit呢?
这里有个换算关系。其计算公式 :
(优先级别 / 8 )商取整数即对应位图中的字节 (优先级别 % 8 )就是对应位图字节中的bit位即优先级125, 125 / 8 = 15 , 125 %8 = 5. 位图的BIT125就是 rt_thread_ready_table[15]的BIT5
为了下面叙述的方便,做如下说明:
- 位图,就指的是数组rt_uint8_t rt_thread_ready_table[32]
这32个字节组成的256个bit。
我们的系统需要根据各个线程的状态,实时的更新这个位图。
自然,我们面临的问题是,寻找优先级最高的线程的问题,
下面是一种显然调度思路的思路,即依次遍历数组rt_
调度算法2
for(i=0; i<32; i++) { for(j=0; j<8; j++) { if (rt_thread_priority_table[i] & (1<<j) ) break;//这就是找到最低的那个为1的bit位置了。 } //下面就是我们找到的最高优先级 highest_ready_priority = i * 8 + j; }算法2可以工作,但依然存在问题,
双层for循环可能只循环一次,也可能会循环256次, 这取决于位图中位图中为1的最低BIT的位置。 如果BIT0为1,则执行一次即跳出循环,如果BIT0- BIT254都是0,仅BIT255为1,则循环256次。 平均来说, 双层for循环的次数大约是 255/2 次。即与优先级数目N成正比。 算法2的问题就是,每次调度函数执行的时间不恒定,
取决于当前线程的优先级分布状况。 这种调度策略从整体上说执行的时间是O(n)的, 即调度算法的平均执行时间跟优先级数目成正比。 这种方式本质上跟调度算法1一样, 依然不能实现在恒定时间完成调度的目标。 我们来改进调度算法2,它之所以耗费时间为O(N),
就是因为每次我们都要对位图的各个bit位做检验。 现在我们将位图看作一个变量,并假定当前优先级别为8,
则位图变量可以用一个字节表示。考虑位图变量的取值范围, 当位图所有BIT0全为0时,位图变量的值就是0, 当位图所有BIT位都是1时( 表示所有线程优先级上都存在就绪的线程,此时最高优先级为0级) ,则位图变量的值是255。反过来,如果当位图变量为1时, 此时位图的BIT0为1,即最高优先级为优先级0,同样, 位图变量为255时,最高优先级依然是0。
当位图变量为6时,BIT2=1,BIT1=1,即最高优先级为1。因此当位图变量取0- 255之间的任意一个数字时, 它的最低为1的BIT位置都是预知的。 我们可以预先将这位图变量的所有取值所对应的最高优先级计算出来 ,并存成一张表格,然后就可以避免算法2中的for循环, 而只需要查表即可,这个执行时间自然是恒定的。实际上, 查表法就是一种常用的用空间换取时间的方法。 位图取值 最低为1的bit位
- 0x01 0 (第0个bit位为1)
- 0x02 1 (第1个bit位为1)
- 0x03 0 (第0个bit位为1)
- ....
- 0xff 0 (第0个bit为1)
注意0x0比较特殊,全部bit位都是0,我们返回0,
我们可以写一个简单的程序来生成这个表格。
gettab.py
#coding=gbk #打印一个字节的最低bit位,可能的值为0,1,2,3,4,5,6,7 samples = 256 def getlowbit(byte): c = 0 for i in range(0,8): if(byte & 0x01): return c c = c+1 byte = byte >> 1 return 0 line ="" for i in range(0,samples): print "%d," %getlowbit(i), if((i+1)%16 == 0): print "\n"就可以得到如下的表了:
const rt_uint8_t rt_lowest_bitmap[] = { /* 00 */ 0, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 10 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 20 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 30 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 40 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 50 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 60 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 70 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 80 */ 7, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* 90 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* A0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* B0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* C0 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* D0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* E0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0, /* F0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0 };注意,我们的问题依然没有解决,当进程优先级为8时,
我们可以查表直接解决,当系统存在32个优先级时, 如果直接制作表格的话,这个表格的元素个数将是 2**32 = 4294967296L= 4G字节。显然这是不可接受的。 32个优先级,即优先级位图变量可以使用u32型,
也就是等价于4个字节, 我们可以对这4个字节从字节0开始依次查表,如果字节0中非0, 则最高优先级一定存在于字节0中,我们对字节0查表rt_ lowest_bitmap,即可以得到最高优先级。 如果字节0为0,字节1非0, 我们对字节1查表得到的是字节1中为1的最低bit位, 然后加上8,就是系统的最高优先级。对字节2,字节3同样处理。
- 假定当前u32 rt_thread_priority_
bitmap维护着当前系统优先级位图。
调度算法3
/* * rt_thread_priority_bitmap 用来表示当前系统优先级位图。 * highest_ready_priority表示当前系统中最高优先级 */ if (rt_thread_priority_bitmap & 0xff) { highest_ready_priority = rt_lowest_bitmap[rt_thread_priority_bitmap & 0xff]; } else if (rt_thread_priority_bitmap & 0xff00) { highest_ready_priority = rt_lowest_bitmap[(rt_thread_priority_bitmap >> 8) & 0xff] + 8; } else if (rt_thread_priority_bitmap & 0xff0000) { highest_ready_priority = rt_lowest_bitmap[(rt_thread_priority_bitmap >> 16) & 0xff] + 16; } else { highest_ready_priority = rt_lowest_bitmap[(rt_thread_priority_bitmap >> 24) & 0xff] + 24; }现在我们解决了32个系统优先级时的调度问题,
现在来考虑线程优先级为256的情况。读者可能会想了, 这没什么不同,256个bit=32个字节, 依然采用算法3的思路,对着32个字节依次查表。问题是, 当位图变量有32个字节时, 对这32个字节依次查表耗费的时间就不可以忽略了, 为了提升系统实时调度的性能,我们需要对算法3进行改进。 为了解决这个问题,我们使用二级位图。
即,256个bit由32个字节存储,
每一个字节的8个bit代表着位图变量中的8个优先级, 如果某个字节非0,则表示其中必有非0的bit位。 rtt中对应的数组为rt_uint8_t rt_thread_ready_table[32]
所谓二级位图,即我们先确定32个字节中最低的非0的字节。
为了实现这个效果, 我们需要对这32个字节引入一个32个bit的位图变量, 每一个bit位表示对应的字节是否为0。例如, 这个32bit的位图变量的BIT5为0, 表示系统线程优先级256bit所分成的32个字节中的 字节5 非0。 为了区分,称这个32个bit的位图变量 字节位图变量 ,rt-thread中使用的是rt_thread_ ready_priority_group.
显然我们查找系统系统最高优先级时,先确定非0的最低字节,这实际上依然是算法3,然后再对该字节进行查表, 即得到该字节内最低为1的bit位,然后两者叠加( 注意不是简单的加)即可。 根据上面的分析,要想使用这个二级位图算法,
rtt在跟踪线程的状态转换时, 不仅需要维护256bit的位图变量数组rt_thread_ ready_table[thread->number] |= thread->high_mask,还需要维护32bit的 字节位图变量 rt_thread_ready_priority_ group。参看如下代码。 thread.c
rt_err_t rt_thread_startup(rt_thread_t thread) { ... /* set current priority to init priority */ thread->current_priority = thread->init_priority; (1) thread->number = thread->current_priority >> 3; /* 5bit */ (2) thread->number_mask = 1L << thread->number; (3) thread->high_mask = 1L << (thread->current_priority & 0x07); /* 3bit */ ... } void rt_schedule_insert_thread(struct rt_thread *thread) { ... #if RT_THREAD_PRIORITY_MAX > 32 (4) rt_thread_ready_table[thread->number] |= thread->high_mask; #endif (5) rt_thread_ready_priority_group |= thread->number_mask; .... }初始化线程时,我们指定了一个线程的优先级别thread->
init_priority,由于线程优先级为0到255, 一个字节就可以表示。但是我们的bitmap是32个字节。 为了调高效率,我们最好能快速向位图的对应的bit写1。
- 语句(1)thread->current_priority >> 3,这里的>>3就是除以8,因为一个字节表示8个优先级。
这样就可以得到当前这个优先级对应的位图32个字节中的第几个字 节,这里用thread->number表示,显然, number范围是0到31。这里为了提高效率, 采用移位完成除法。 - 上面除法的余数,就表示这个优先级在上面字节中的第几个bit。
这个余数可以使用 (thread->current_priority & 0x07)来表示。 - 语句(3)是得到该bit对应的权值。
例如一个字节的bit7对应的权值即 (1<<7),这样做是为了使用“位与,或,非”等位运算, 可以提高运行速度,即语句(4)。 - 语句(4)清楚表示了这几个变量作用。可见,
根据某个表示优先级的数字向位图中相应的bit位写入了1。 - 那么语句(2)和(5)是做什么用的呢? 这个number_
mask实际上是为了加快查找位图的速度而创建的。它将在rt_ schedule函数中发挥作用。
上文已说明,thread->
rt_thread_ready_priority_
概括起来就是,rtt首先确定32个字节的位图中,
下面附上rt_schedule的代码。非必要的代码被我隐去。
void rt_schedule(void) { .... register rt_ubase_t highest_ready_priority; #if RT_THREAD_PRIORITY_MAX == 8 highest_ready_priority = rt_lowest_bitmap[rt_thread_ready_priority_group]; #else register rt_ubase_t number; /* find out the highest priority task */ if (rt_thread_ready_priority_group & 0xff) { number = rt_lowest_bitmap[rt_thread_ready_priority_group & 0xff]; } else if (rt_thread_ready_priority_group & 0xff00) { number = rt_lowest_bitmap[(rt_thread_ready_priority_group >> 8) & 0xff] + 8; } else if (rt_thread_ready_priority_group & 0xff0000) { number = rt_lowest_bitmap[(rt_thread_ready_priority_group >> 16) & 0xff] + 16; } else { number = rt_lowest_bitmap[(rt_thread_ready_priority_group >> 24) & 0xff] + 24; } highest_ready_priority = (number << 3) + rt_lowest_bitmap[rt_thread_ready_table[number]]; .... }