注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Tsecer的回音岛

Tsecer的博客

 
 
 

日志

 
 

386内核的__ticket_spin_lock实现方法及意义  

2012-02-22 23:00:11|  分类: Linux内核 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
一、spin_lock
在驱动程序中,很多操作都是很短暂的,但是会存在竞争,不过解决的办法不应该是使用传统的“可休眠锁”。至于为什么,我在之前的一些文章中有所描述,大致的意思就是:中断的发生是随机的,它和当前运行任务没有任何必然联系,而所谓的“休眠”是指当前任务交出系统执行权(CPU使用权),所以这种做法是不合理的。我看这个问题的原因是在看串口驱动的时候遇到了这个锁,然后一路看了下来,看到386这个有点意思的实现方式。再具体的说就是在linux-2.6.21\drivers\serial\8250.c文件的serial8250_interrupt函数里,开始就有一个
    spin_lock(&i->lock);
这里可以看到,中断的到来是蛮不讲理的,说来就来,不打招呼,所以此时就要保护这个数据,使用的就是spin_lock。它的实现方式就是“一根筋”的做法,等待一个锁的时候,如果暂时被别人占用,我就在在原地撒泼打滚,不断的查看这个锁是否被释放,直到锁被释放之后才继续执行,至于具体怎么打转,接下来再讲。这里如果打转时间太长,会不会把自己转晕,那是有可能的,所以这类锁在设置的时候应用场景就要控制,就是这个锁内部的操作都很简单,并且不会睡眠。
二、早期内核自旋锁实现
linux-2.6.21\include\asm-i386\spinlock.h
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    asm volatile("\n1:\t"
             LOCK_PREFIX " ; decb %0\n\t"锁总线之后执行对内存变量的递减操作,锁总线保证多处理器下内存访问一致性
             "jns 3f\n" 如果减一之后内存值大于等于1,那么认为已经获得锁,可以从spin_lock返回
             "2:\t"    空转循环前向跳转锚点
             "rep;nop\n\t"相当于386处理P4之后添加的pause指令
             "cmpb $0,%0\n\t" 内存值和0比较,
             "jle 2b\n\t"小于等于零继续打转
             "jmp 1b\n"如果大于0,则回到开始,再次递减内存值,重复该过程。此处跳转和jle 2b的区别在于,jle 2b时锁的持有者还没有释放锁,粗俗的说就是执行完了spin_lock但是还没有执行spin_unlock。而jmp 1b则说明执行过spin_lock的任务执行完了spin_unlock,此时锁又可用了,秦人失其鹿,天下共逐之。在多核处理器下,可能有多个CPU上的任务都在等待这个锁,也就是说都在执行这个jle 2b循环,直到有一个眼疾手快,通过jle发现这个锁可用了,所以它就调回到开始递减内存值,然后bingo,它得到了锁。那些其他后知后觉的CPU可能在这个过程中(即新的CPU执行decb %0之前)也可能会知道这个小道消息,所以它也会到开始执行那个decb,不幸的时,有人捷足先登了,所以希望马上破灭,继续打转
             "3:\n\t"
             : "+m" (lock->slock) : : "memory");
}

static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
    asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
}
这里的现象:
当lock的时候,递减锁数值;当unlock的时候,是不管三七二十一,直接把锁的数值修改为1而不是累加锁的值。这是一个不对称的过程。这也容易理解,当出现零或者负值的时候,说明是锁已经被人攫取了,因为这锁一次只能有一个人持有,所以只要知道是不是有人占有(锁值是否小于等于零)就可以了,如果已经被占有,那打转是必须的。对于unlock来说,它直接置锁值为1,从而表示锁已经可用,这样大家就又可以一窝蜂的来抢这个锁了。这就是操作系统中常说的“惊群效应”,也就是说大家一窝蜂的来抢,但是只有一个能抢到,而且这一个抢到的到底是谁,那不好意思,没准儿。
三、glibc 计数信号量实现
这个和自旋锁的实现一定是不同的,因为它有一个初始化值,也就是一个初始资源数,假设一个资源有5个,那么前五个试图获得这种资源的参与者都是可以获得这个的(耳边响起了那句:快拿起你的电话订购吧!),但是第六个可能就没有那么幸运*(如果前五个都抓住不放的话)。这里就需要记录睡眠者或者说等待者的数量,这一点是在C库实现的。我们看一下C库的sem信号量是如何实现的:
struct new_sem
{
  unsigned int value;这个就是资源计数,它的初始值代表了系统最开始这种资源有多少了。
  int private;这个是信号灯是否在进程间共享的一个标志,将会影响futex系统调用的参数,可以忽略。
  unsigned long int nwaiters; 等待者的数量。由于资源比较丰富,并不是有人获得了锁其它人就没机会了,但是又不是人人都有机会。
};
这里最大的感觉就是多了个nwaiters成员,这是自旋锁锁没有的。由于386的sem_wait和sem_post都是用汇编实现的,所以看起来比较麻烦,大致看一下其实现思路。
在sem_wait的时候,递减value的值,也就是递减资源数,如果资源数小于等于零,说明已经没有资源了,此时需要通过futex进行等待。但是我看了一下代码,并不是这么实现的,它是首先判断这个资源数是否为零,如果为零,则不再递减该值,而是递增nwaiters的值之后通过futex进行休眠,如果不等于零,则进行递减操作并直接返回。
在sem_post的时候,该函数会开始就直接递增资源数,而没有进行原始资源值的判断;然后判断nwaiters的值,如果大于零,则通过futex做唤醒操作(这里并不递减nwaiters的值,这个值有在sem_wait中递减,并且还要考虑被其它进程抢占的情况)。
其实这些都不重要,重要的是从代码中可以看到,如果初始资源为2,在未执行sem_wait的情况下先执行3次sem_post,那么之后可以连续执行2+3=5次sem_wait而不阻塞。
测试代码:
[tsecer@Harry sempost]$ cat sem.c
#include <stdio.h>
#include <semaphore.h>

void sem_take(sem_t * sem)
{
    printf("sem_taking\n");
    sem_wait(sem);
    sem_take(sem);
}
int main()
{
    sem_t mysem;
    int i;
    sem_init(&mysem,0,2);
    for(i = 0 ; i < 3 ; i++)
        sem_post(&mysem);
    sem_take(&mysem);
}
[tsecer@Harry sempost]$ cat Makefile
default:
    gcc  sem.c -o sem.exe -lrt
[tsecer@Harry sempost]$ make
gcc  sem.c -o sem.exe -lrt
[tsecer@Harry sempost]$ ./sem.exe
sem_taking
sem_taking
sem_taking
sem_taking
sem_taking
sem_taking
四、内核中ticket_spin_lock实现
这个是在比较新的内核里有的,并且在2.6.37中可能只有386实现了这个功能,它的特征就是有一个排序的功能,相对公平一些。就是说唤醒的顺序和等待的顺序相同,先等待的先被唤醒,后等待的后被唤醒,这一点是之前spin_lock实现所不具备的一个特征。
linux-2.6.37.1\arch\x86\include\asm\spinlock.h
文件的开始有一段注释,作者写的注释大家还是要看看,因为写这么多肯定不是吃多了撑着了。
/*
 * Ticket locks are conceptually two parts, one indicating the current head of  抽象的说,就是一个锁被分成了两个部分,一部分指示等待队列的头
 * the queue, and the other indicating the current tail. The lock is acquired      另一部分指向队列的尾部。再具体的说,是将一个锁类型等分了两部分
 * by atomically noting the tail and incrementing it by one (thus adding            高部分做等待队列的尾部,低半部分作为队列的头部
 * ourself to the queue and noting our position), then waiting until the head
 * becomes equal to the the initial value of the tail.
 *
 * We use an xadd covering *both* parts of the lock, to increment the tail and 使用了386特有的xadd指令,它增加尾部指针并且还会加载头部指针值
 * also load the position of the head, which takes care of memory ordering       是一条典型的CISC指令,属于多才多艺、任劳任怨的一条指令,这
 * issues and should be optimal for the uncontended case. Note the tail must be 指令还会保证不会乱序执行大家可以理解为内置内存栅
 * in the high part, because a wide xadd increment of the low part would carry
 * up and contaminate the high part.
 *
 * With fewer than 2^8 possible CPUs, we can use x86's partial registers to
 * save some instructions and make the code more elegant. There really isn't
 * much between them in performance though, especially as locks are out of line.
下面是实现
static __always_inline void __ticket_spin_lock(arch_spinlock_t *lock)
{
    int inc = 0x00010000;
    int tmp;
//其中xadd x,y的意思是把x和y的值互换,然后把它们的和放入y中。这和普通的add操作区别在于可以把目的地址原始值保存下来
    asm volatile(LOCK_PREFIX "xaddl %0, %1\n"  将锁的值增加inc=0x0001000,即高word递增,并把锁原始值放入%0,即inc变量中,所以inc变量不能声明为常量。
             "movzwl %w0, %2\n\t"将锁原始值的低word放入tmp变量中
             "shrl $16, %0\n\t"将锁变量高word原始值放入inc变量中
             "1:\t"
             "cmpl %0, %2\n\t"比较tmp和高word值是否相同
             "je 2f\n\t"  相同则获得锁,可以返回
             "rep ; nop\n\t" 延时
             "movzwl %1, %2\n\t" 将当前锁变量值低word放入tmp变量,重新执行tmp和inc值比较,相等则推出等待
             /* don't need lfence here, because loads are in-order */
             "jmp 1b\n"
             "2:"
             : "+r" (inc), "+m" (lock->slock), "=&r" (tmp)
             :
             : "memory", "cc");
}
只看上面代码可能有些晕,所以要结合unlock的代码分析一下
static __always_inline void __ticket_spin_unlock(arch_spinlock_t *lock)
{
    asm volatile(UNLOCK_LOCK_PREFIX "incw %0"
             : "+m" (lock->slock)
             :
             : "memory", "cc");
}
代码很简单,就是递增锁变量低word值。
思路整理是这样的:
每个CPU执行这个路径的时候,它递增锁变量高word的值,这个值作为自己这次锁动作的标识,值的大小关系可以决定执行lock的顺序,所以可以认为是一个ticket。只要有人来lock,就增加这个值,从而更新ticket的值,也就是作者所说的queue tail。不同CPU将自己分配到的值(xadd中x代表的eXchange执行该操作)保存在tmp中,而锁变量的值将会随着lock/unlock动作而实时变化。
在unlock的时候,它增加低word值,当锁变量的低word更新到和自己分到的ticket(保存在tmp)相同时,则表示之前的所有lock都执行过了unlock,所以自己可以获得这个锁了,这样就保证了唤醒顺序和阻塞顺序一致。
这里可以看到,全部是递增操作,没有递减,这里的溢出是如何处理的呢?事实上,汇编中的指令都是以word为操作单位,所以值并不会向高word溢出。另一方面,当word溢出之后也没有关系因为这个是一个比较关系,当tail赶上head的时候,需要经过2^16次锁,也就是这么多个CPU,可以认为在实际中不存在。
如果大家理解了上面的思路,可以尝试理解一下如何判断当前锁是否有被占用的实现,代码如下:
static inline int __ticket_spin_is_locked(arch_spinlock_t *lock)
{
    int tmp = ACCESS_ONCE(lock->slock);

    return !!(((tmp >> TICKET_SHIFT) ^ tmp) & ((1 << TICKET_SHIFT) - 1));
}
  评论这张
 
阅读(1447)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017