注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Tsecer的回音岛

Tsecer的博客

 
 
 

日志

 
 

Linux下robust互斥锁实现  

2012-04-11 22:54:56|  分类: Linux系统编程 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
一、robust互斥锁
这种类型的锁可能不是POSIX标准中规定的锁,但是也有可能是,这个不太确定,暂时不管。这种类型的锁主要是解决当一个持有互斥锁的线程退出之后这个锁成为不可用状态的一个问题来的。可以想象,一个线程可能持有很多的锁,也可能没有,如果此时有一个外部(被其它任务kill)或者内部(出现访问问题或者直接pthread_exit)原因而推出线程,此时虽然这个线程退出是一了百了了,但是其它的线程还是希望能够坚强的运行下去,此时就需要有一些机制来保证这个线程持有的那些希望被自动释放的锁有一种方法能够被释放,而这个工作就责无旁贷的落在了内核的身上。
从具体实现上来看,它打破了我们常规的“用户态VS内核态”的模式,模糊了用户和内核的概念,因为内核会遍历用户态提供的链表,这个是一个危险的操作,因为这个链表对内核来说是可能出现环路的,内存访问错误倒是小事。在魔兽中,兽族的斧头兵在冲锋陷阵的时候喊得口号是“为了部落”,而内核的程序员在写程序的时候喊的口号一定是“为了效率”。
二、信号类型的声明
这些锁的类型一般中间有一个ROBUST关键字,如果同学们有glibc的代码,看一下glibc-2.7\nptl\pthreadP.h文件中那些ROBUST类型的类型声明就可以知道它们的家族了,和普通的锁类型相同,也有一些ERROR_CHECK类型的互斥锁,当然可能不是POSIX标准,如果对跨平台要求比较严格,请慎用。
这个robust属性的设置需要使用专门的接口pthread_mutexattr_setrobust_np函数来设置,对于这些NP后缀的,我一直怀疑他们是NonPosix的缩写,未经证实,但是也有可能是POSIX的某个版本之后追加的一些性能也不好说。
三、向内核注册链表首地址
内核为了实现这种锁,专门提供了两个系统调用,分别是sys_set_robust_list和sys_get_robust_list,由于或者对于我们这次分析没有什么影响,所以我们只看一下注册的接口:
asmlinkage long
sys_set_robust_list(struct robust_list_head __user *head,
            size_t len)
{
    /*
     * The kernel knows only one size for now:
     */
    if (unlikely(len != sizeof(*head)))
        return -EINVAL;

    current->robust_list = head;内核还专门为每个线程的控制结构中添加了一个robust_list成员来记录用户态注册的链表头,事实上他是一个robust_list_head类型的结构

    return 0;
}
四、用户态注册
1、相关结构的分配与初始化
glibc-2.7\nptl\allocatestack.c
allocate_stack (const struct pthread_attr *attr, struct pthread **pdp,  ALLOCATE_STACK_PARMS)
  /* The robust mutex lists also need to be initialized
     unconditionally because the cleanup for the previous stack owner
     might have happened in the kernel.  */
  pd->robust_head.futex_offset = (offsetof (pthread_mutex_t, __data.__lock)
                  - offsetof (pthread_mutex_t,
                          __data.__list.__next));这个偏移量是编译时确定的,它的值是pthread_mutex_t结构中__lock成员和__list成员之间的相对字节偏移处,之前说过,内核是需要遍历用户态链表的,根据最为原始的链表(就是我们上任何一个数据结构中都会提到的)就是一个 struct node {struct node *next; type var;}类型的结构,但是内核说为了给用户态任务更大的自由度,内核不能假设这个__lock就是这么正正经经的紧邻着链表成员,所以提供了一个offset,这样两者之间可以有更多的控制结构,事实上C库也就是这么实现的,它们的确不是连续的。有人问,问什么这么巧呢?因为C库中相关功能的开发是和内核同步的,可以说是C库的需求促使了内核该功能的添加,是一个订制功能
  pd->robust_head.list_op_pending = NULL;
#ifdef __PTHREAD_MUTEX_HAVE_PREV
  pd->robust_prev = &pd->robust_head;
#endif
  pd->robust_head.list = &pd->robust_head;初始情况下下,链表自回环,为空
2、向内核注册
glibc-2.7\nptl\pthread_create.c
start_thread (void *arg)
#ifdef __NR_set_robust_list
# ifndef __ASSUME_SET_ROBUST_LIST
  if (__set_robust_list_avail >= 0)
# endif
    {
      INTERNAL_SYSCALL_DECL (err);
      /* This call should never fail because the initial call in init.c
     succeeded.  */
      INTERNAL_SYSCALL (set_robust_list, err, 2, &pd->robust_head,这个接口没什么好说的,和内核接口直接对应,用户态和内核态共享结构
            sizeof (struct robust_list_head));
    }
#endif
五、用户态如何获得ROBUST锁
1、robust锁的获取操作
对于robust锁,内核和用户态约定了这个__lock的意义,它的低30bits用来记录当前锁持有者的线程id号,剩余高2bits记录FUTEX_WAITERS和FUTEX_OWNER_DIED标志bit。
glibc-2.7\nptl\pthread_mutex_lock.c中__pthread_mutex_lock (mutex)函数

 THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,首先设置内核和用户态共享的list_op_pending结构,注意的是,这是一个指针,而不是链表,它只有一个元素,这和前面说的 robust_list list成员的意义不同,至于为什么有这个成员,我们之后再解释
             &mutex->__data.__list.__next);

      oldval = mutex->__data.__lock;
      do
    {
    again:
      if ((oldval & FUTEX_OWNER_DIED) != 0)首先判断是不是持有者已经在没有释放锁的情况下挂掉了
        {
          /* The previous owner died.  Try locking the mutex.  */
          int newval = id;
#ifdef NO_INCR
          newval |= FUTEX_WAITERS;
#else
          newval |= (oldval & FUTEX_WAITERS);
#endif

          newval
        = atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
                               newval, oldval);尝试原子性修改锁值,这个值内核和用户态共享各个bit的意义

          if (newval != oldval)
        {
          oldval = newval;
          goto again;原子操作失败,重试
        }

          /* We got the mutex.  */
          mutex->__data.__count = 1;
          /* But it is inconsistent unless marked otherwise.  */
          mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;持有者设置为不一致,因为这是从死亡者里拿到的东西,相当于是“腐尸”

          ENQUEUE_MUTEX (mutex);这个操作比较重要,它会把这个锁太添加到 robust_list list链表的开始,表明自己已经获得了这个锁,实现比较简单,这里省略
          THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);清空op_pending列表,这个列表在执行锁操作之前初始化

          /* Note that we deliberately exit here.  If we fall
         through to the end of the function __nusers would be
         incremented which is not correct because the old
         owner has to be discounted.  If we are not supposed
         to increment __nusers we actually have to decrement
         it here.  */
#ifdef NO_INCR
          --mutex->__data.__nusers;
#endif

          return EOWNERDEAD;这里虽然获得了锁,但是低调的返回EOWNERDEAD,反过来说,虽然返回值为EOWNERDEAD,但是锁还是已经获得了
        }
………………
      oldval = LLL_ROBUST_MUTEX_LOCK (mutex, id);这个操作本质上也是来修改__lock结构为当前线程的tid(其中的id就是线程的tid),只是它做了更多的功能,那就是判断当获得锁失败的时候贴心的将线程挂起。反过来说,当一个线程因为OWNERDEAD被唤醒之后,它同样会从这里返回,并执行这个while大循环,判断出ownerdead标志位

      if (__builtin_expect (mutex->__data.__owner
                == PTHREAD_MUTEX_NOTRECOVERABLE, 0))如果pthread_mutex_lock返回EOWNERDEAD而调用者没有执行pthread_mutex_consistent_np函数,那么将会满足该条件,这下可能真的是完蛋去了
        {
          /* This mutex is now not recoverable.  */
          mutex->__data.__count = 0;
          lll_unlock (mutex->__data.__lock,
              PTHREAD_ROBUST_MUTEX_PSHARED (mutex));
          THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
          return ENOTRECOVERABLE;
        }
    }
      while ((oldval & FUTEX_OWNER_DIED) != 0);

      mutex->__data.__count = 1;
      ENQUEUE_MUTEX (mutex);
      THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
      break;
2、list_op_pending的意义
前面说过,这个不是链表,而只是一个简单的指针,它一次只指向一个元素。为什么要这个结构呢?因为这个操作是极为特殊的,特殊之处在于它获得一个锁需要执行两个操作:
1、一个是设置好__lock为自己的tid
2、将这个mutex结构添加到内核识别的robust_list链表中去。
这两个操作明显不是原子性的,假设在第一步执行结束之后,任务被杀死,那么线程退出的时候这个锁是没有办法被唤醒的,虽然它事实上已经获得了这个锁,但是由于没有注册到内核识别的链表中而无法被唤醒,这样就有可能出现悲剧。通常解决原子性操作的方法就是加锁,但是这里是锁本身,所以没有办法依赖锁,只能添加一个可能会竞争的op_pending链表,从而让内核这个可以关中断并处理锁的机制来完成这个原子性。这和我们解决问题的模式是一样的,如果这层没有办法做,那么就向下推一层。
六、内核态唤醒
1、唤醒时机
do_exit--->>>exit_robust_list
if (pending) 如果pending链表非空,那么表示“可能”存在一个锁,所以尝试进行唤醒
        handle_futex_death((void __user *)pending + futex_offset, curr, pip);

    while (entry != &head->list) {这个循环是真正变脸head结构链表,其中的每一项都是货真价实的等待被唤醒的robust锁,当然里面还有一个比较特殊的entry!=pending判断,着同样是用户态院子操作的保证,有兴趣的同学可以分析一下在用户态哪个位置切换会出现entry==pending的情况(请大家不要怀疑我是不是知道)
        /*
         * A pending lock might already be on the list, so
         * don't process it twice:
         */
        if (entry != pending)
            if (handle_futex_death((void __user *)entry + futex_offset,
                        curr, pi))
                return;
        /*
         * Fetch the next entry in the list:
         */
        if (fetch_robust_entry(&entry, &entry->next, &pi))
            return;
        /*
         * Avoid excessively long or circular lists:
         */
        if (!--limit)这里最为高效的防止出现循环链表的方法,就是假设链表最多为 ROBUST_LIST_LIMIT    2048项
            break;

        cond_resched();
    }
2、唤醒函数

/*
 * Process a futex-list entry, check whether it's owned by the
 * dying task, and do notification if so:
 */
int handle_futex_death(u32 __user *uaddr, struct task_struct *curr, int pi)
{
    u32 uval, nval, mval;

retry:
    if (get_user(uval, uaddr))
        return -1;

    if ((uval & FUTEX_TID_MASK) == curr->pid) {如果退出线程持有该锁
        /*
         * Ok, this dying thread is truly holding a futex
         * of interest. Set the OWNER_DIED bit atomically
         * via cmpxchg, and if the value had FUTEX_WAITERS
         * set, wake up a waiter (if any). (We have to do a
         * futex_wake() even if OWNER_DIED is already set -
         * to handle the rare but possible case of recursive
         * thread-death.) The rest of the cleanup is done in
         * userspace.
         */
        mval = (uval & FUTEX_WAITERS) | FUTEX_OWNER_DIED;设置用户态最为关心的OWNER_DIED标志
        nval = futex_atomic_cmpxchg_inatomic(uaddr, uval, mval);

        if (nval == -EFAULT)
            return -1;

        if (nval != uval)
            goto retry;

        /*
         * Wake robust non-PI futexes here. The wakeup of
         * PI futexes happens in exit_pi_state():
         */
        if (!pi) {
            if (uval & FUTEX_WAITERS)如果该锁上有等待者
                futex_wake(uaddr, 1);这里注意的是此处只会唤醒一个,因为唤醒多个只会让用户态更乱,没有意义
        }
    }
    return 0;
}
七、pthread_mutex_consistent_np函数的意义
int
pthread_mutex_consistent_np (mutex)
     pthread_mutex_t *mutex;
{
  /* Test whether this is a robust mutex with a dead owner.  */
  if ((mutex->__data.__kind & PTHREAD_MUTEX_ROBUST_NORMAL_NP) == 0
      || mutex->__data.__owner != PTHREAD_MUTEX_INCONSISTENT)
    return EINVAL;

  mutex->__data.__owner = THREAD_GETMEM (THREAD_SELF, tid);

  return 0;
}
如果不执行,在__pthread_mutex_lock函数返回EOWNERDEAD之后,锁的__owner将被设置为PTHREAD_MUTEX_INCONSISTENT
 mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;
然后在持有者执行pthread_mutex_unlock的时候
      /* If the previous owner died and the caller did not succeed in
     making the state consistent, mark the mutex as unrecoverable
     and make all waiters.  */
      if (__builtin_expect (mutex->__data.__owner
                == PTHREAD_MUTEX_INCONSISTENT, 0))
      notrecoverable:
    newowner = PTHREAD_MUTEX_NOTRECOVERABLE;
而其它线程再次执行pthread_mutex_lock的时候,
 if (__builtin_expect (mutex->__data.__owner
                == PTHREAD_MUTEX_NOTRECOVERABLE, 0))
        {
          /* This mutex is now not recoverable.  */
          mutex->__data.__count = 0;
          lll_unlock (mutex->__data.__lock,
              PTHREAD_ROBUST_MUTEX_PSHARED (mutex));
          THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
          return ENOTRECOVERABLE;
        }
可见,从原始的INCONSISTENT转换为NOTRECOVERABLE还是一个比较严重的问题,开始只是“不一致”,最后就“无可挽回”了。
  评论这张
 
阅读(2196)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017