注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Tsecer的回音岛

Tsecer的博客

 
 
 

日志

 
 

futex的robust锁实现原理  

2011-02-13 21:40:23|  分类: Linux内核 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

一、基本支持

用户态核心操作

cmpxch mem,old,new

原子操作,这个操作是将内存中的值和制定的old值进行比较,如果相同则赋值为new,不同则不复制,并且始终返回old值。关于这个始终返回old值的原因说明:因为根据old值我们可以判断操作是否完成,从if来看,原始值和当前值相等时赋值发生的充分条件,反之则不然。也就是说,我们通过当前值可以推测出此次赋值是否真正发生,而指令后的值对我们没有意义。

二、C库的初始化

glibc-2.7\nptl\sysdeps\pthread\pt-initfini.c

static void
call_initialize_minimal (void)
{
  extern void __pthread_initialize_minimal_internal (void)
    __attribute ((visibility ("hidden")));

  __pthread_initialize_minimal_internal ();
}

SECTION (".init"); 由于这个函数被放置在.init节中,所以这个函数中的代码将会在main函数执行之前被执行,也就是执行其中的call_initialize_minimal函数将会由链接器和C库共同完成这个功能。再次强调,这个函数的执行是在main函数之前
extern void __attribute__ ((section (".init"))) _init (void);
void
_init (void)
{
  /* The very first thing we must do is to set up the registers.  */
  call_initialize_minimal ();

然后是这个函数的实现

void
__pthread_initialize_minimal_internal (void)
{
#ifndef SHARED
  /* Unlike in the dynamically linked case the dynamic linker has not
     taken care of initializing the TLS data structures.  */
  __libc_setup_tls (TLS_TCB_SIZE, TLS_TCB_ALIGN);

  /* We must prevent gcc from being clever and move any of the
     following code ahead of the __libc_setup_tls call.  This function
     will initialize the thread register which is subsequently
     used.  */
  __asm __volatile ("");
#endif

  /* Minimal initialization of the thread descriptor.  */
  struct pthread *pd = THREAD_SELF;
  INTERNAL_SYSCALL_DECL (err);
  pd->pid = pd->tid = INTERNAL_SYSCALL (set_tid_address, err, 1, &pd->tid); 这里将系统新设置的tid指针指向确定位置
  THREAD_SETMEM (pd, specific[0], &pd->specific_1stblock[0]);
  THREAD_SETMEM (pd, user_stack, true);
  if (LLL_LOCK_INITIALIZER != 0)
    THREAD_SETMEM (pd, lock, LLL_LOCK_INITIALIZER);
#if HP_TIMING_AVAIL
  THREAD_SETMEM (pd, cpuclock_offset, GL(dl_cpuclock_offset));
#endif

对于i386,这个TCB的大小为

/* This is the size of the TCB.  */
# define TLS_TCB_SIZE sizeof (struct pthread)

/* Alignment requirements for the TCB.  */
# define TLS_TCB_ALIGN __alignof__ (struct pthread)

在__libc_setup_tls (size_t tcbsize, size_t tcbalign)函数中还完成了处理器相关的线程结构指针的设置

# define TLS_INIT_TP(thrdescr, secondcall) \
………………

  INSTALL_DTV (tlsblock, static_dtv);
  const char *lossage = TLS_INIT_TP (tlsblock, 0);

     /* Install the TLS.  */            \
     asm volatile (TLS_LOAD_EBX            \
     "int $0x80\n\t"           \
     TLS_LOAD_EBX            \
     : "=a" (_result), "=m" (_segdescr.desc.entry_number)       \
     : "0" (__NR_set_thread_area),         \
       TLS_EBX_ARG (&_segdescr.desc), "m" (_segdescr.desc));    \

其中对于robust列表的设置代码同样位于__pthread_initialize_minimal_internal (void)函数中

#ifdef __NR_set_robust_list
  pd->robust_head.futex_offset = (offsetof (pthread_mutex_t, __data.__lock)
      - offsetof (pthread_mutex_t,
           __data.__list.__next));
  int res = INTERNAL_SYSCALL (set_robust_list, err, 2, &pd->robust_head,
         sizeof (struct robust_list_head));
  if (INTERNAL_SYSCALL_ERROR_P (res, err))
#endif

 

三、robust锁的用户态操作

在__pthread_mutex_lock (mutex)函数中,其执行了对链表的操作

  pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
…………

 case PTHREAD_MUTEX_ROBUST_RECURSIVE_NP:
    case PTHREAD_MUTEX_ROBUST_ERRORCHECK_NP:
    case PTHREAD_MUTEX_ROBUST_NORMAL_NP:
    case PTHREAD_MUTEX_ROBUST_ADAPTIVE_NP:
      THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
       &mutex->__data.__list.__next);

       oldval = LLL_ROBUST_MUTEX_LOCK (mutex, id); 这里将新的线程的id设置到futex使用的4字节地址中
…………

      mutex->__data.__count = 1;
      ENQUEUE_MUTEX (mutex);
      THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

对于

#define ENQUEUE_MUTEX(mutex) ENQUEUE_MUTEX_BOTH (mutex, 0)
# define ENQUEUE_MUTEX_BOTH(mutex, val)           \
  do {               \
    mutex->__data.__list.__next            \
      = THREAD_GETMEM (THREAD_SELF, robust_list.__next);        \
    THREAD_SETMEM (THREAD_SELF, robust_list.__next,         \
     (void *) (((uintptr_t) &mutex->__data.__list) | val));     \
  } while (0)
# define DEQUEUE_MUTEX(mutex) \
  do {               \
    __pthread_slist_t *runp = (__pthread_slist_t *)         \
      (((uintptr_t) THREAD_GETMEM (THREAD_SELF, robust_list.__next)) & ~1ul); \
    if (runp == &mutex->__data.__list)           \
      THREAD_SETMEM (THREAD_SELF, robust_list.__next, runp->__next);       \
    else              \
      {               \
 __pthread_slist_t *next = (__pthread_slist_t *)        \
   (((uintptr_t) runp->__next) & ~1ul);          \
 while (next != &mutex->__data.__list)          \
   {              \
     runp = next;            \
     next = (__pthread_slist_t *) (((uintptr_t) runp->__next) & ~1ul); \
   }              \
               \
 runp->__next = next->__next;           \
 mutex->__data.__list.__next = NULL;          \
      }               \
  } while (0)

也就是将新持有的futex数据中设置在自己占有的信号量链表中,从而让内核知道自己已经占有了一个futex

四、内核支持

void mm_release(struct task_struct *tsk, struct mm_struct *mm)

/*
 * Walk curr->robust_list (very carefully, it's a userspace list!)
 * and mark any locks found there dead, and notify any waiters.
 *
 * We silently return on any sign of list-walking problem.
 */
void exit_robust_list(struct task_struct *curr)
{
 struct robust_list_head __user *head = curr->robust_list; 这个值就是C库线程初始化的时候通过set_robustlist系统调用实现的
 struct robust_list __user *entry, *next_entry, *pending;
 unsigned int limit = ROBUST_LIST_LIMIT, pi, next_pi, pip;
 unsigned long futex_offset;
 int rc;

 if (!futex_cmpxchg_enabled)
  return;

 /*
  * Fetch the list head (which was registered earlier, via
  * sys_set_robust_list()):
  */
 if (fetch_robust_entry(&entry, &head->list.next, &pi))
  return;
 /*
  * Fetch the relative futex offset:
  */
 if (get_user(futex_offset, &head->futex_offset))
  return;
 /*
  * Fetch any possibly pending lock-add first, and handle it
  * if it exists:
  */
 if (fetch_robust_entry(&pending, &head->list_op_pending, &pip))
  return;

 next_entry = NULL; /* avoid warning with gcc */
 while (entry != &head->list) {
  /*
   * Fetch the next entry in the list before calling
   * handle_futex_death:
   */
  rc = fetch_robust_entry(&next_entry, &entry->next, &next_pi);
  /*
   * A pending lock might already be on the list, so
   * don't process it twice:
   */
  if (entry != pending)
   if (handle_futex_death((void __user *)entry + futex_offset,
      curr, pi))
    return;
  if (rc)
   return;
  entry = next_entry;
  pi = next_pi;
  /*
   * Avoid excessively long or circular lists:
   */
  if (!--limit)
   break;

  cond_resched();
 }

 if (pending)
  handle_futex_death((void __user *)pending + futex_offset,
       curr, pip);
}
对于链表中的每一个元素


/*
 * Process a futex-list entry, check whether it's owned by the
 * dying task, and do notification if so:
 */
int handle_futex_death(u32 __user *uaddr, struct task_struct *curr, int pi)
{
 u32 uval, nval, mval;

retry:
 if (get_user(uval, uaddr))
  return -1;

 if ((uval & FUTEX_TID_MASK) == task_pid_vnr(curr)) {这里非常重要,它判断了系统中的一个robust链表中的futex地址处放置的拥有者的id是否和当前的退出线程id相同,如果相同,则执行OWNER_DEATH的唤醒操作
  /*
   * Ok, this dying thread is truly holding a futex
   * of interest. Set the OWNER_DIED bit atomically
   * via cmpxchg, and if the value had FUTEX_WAITERS
   * set, wake up a waiter (if any). (We have to do a
   * futex_wake() even if OWNER_DIED is already set -
   * to handle the rare but possible case of recursive
   * thread-death.) The rest of the cleanup is done in
   * userspace.
   */
  mval = (uval & FUTEX_WAITERS) | FUTEX_OWNER_DIED;
  nval = futex_atomic_cmpxchg_inatomic(uaddr, uval, mval);

  if (nval == -EFAULT)
   return -1;

  if (nval != uval)
   goto retry;

  /*
   * Wake robust non-PI futexes here. The wakeup of
   * PI futexes happens in exit_pi_state():
   */
  if (!pi && (uval & FUTEX_WAITERS))
   futex_wake(uaddr, 1, 1, FUTEX_BITSET_MATCH_ANY);
 }
 return 0;
}

上面一些标志也说明了futex内存储的高地址bit可能用来做一些另外的特殊用处

/*
 * Are there any waiters for this robust futex:
 */
#define FUTEX_WAITERS  0x80000000

/*
 * The kernel signals via this bit that a thread holding a futex
 * has exited without unlocking the futex. The kernel also does
 * a FUTEX_WAKE on such futexes, after setting the bit, to wake
 * up any possible waiters:
 */
#define FUTEX_OWNER_DIED 0x40000000

这个是否有等待者的操作

# define LLL_ROBUST_MUTEX_LOCK(mutex, id) \
------->>>

#define lll_robust_lock(futex, id, private) \
  ({ int result, ignore1, ignore2;           \
     __asm __volatile (LOCK_INSTR "cmpxchgl %1, %2\n\t"         \
         "jnz _L_robust_lock_%=\n\t"         \
         ".subsection 1\n\t"          \
         ".type _L_robust_lock_%=,@function\n"        \
         "_L_robust_lock_%=:\n"          \
         "1:\tleal %2, %%edx\n"          \
         "0:\tmovl %7, %%ecx\n"          \
         "2:\tcall __lll_robust_lock_wait\n"        \
         "3:\tjmp 18f\n"           \
         "4:\t.size _L_robust_lock_%=, 4b-1b\n\t"        \
         ".previous\n"           \
         LLL_STUB_UNWIND_INFO_4          \
         "18:"            \
         : "=a" (result), "=c" (ignore1), "=m" (futex),       \
    "=&d" (ignore2)          \
         : "0" (0), "1" (id), "m" (futex), "g" (private)       \
         : "memory");           \
     result; })

int
__lll_robust_lock_wait (int *futex, int private)
{
  int oldval = *futex;
  int tid = THREAD_GETMEM (THREAD_SELF, tid);

  /* If the futex changed meanwhile try locking again.  */
  if (oldval == 0)
    goto try;

  do
    {
      if (__builtin_expect (oldval & FUTEX_OWNER_DIED, 0))
 return oldval;

      int newval = oldval | FUTEX_WAITERS此处设置这个futex上有等待线程标志
      if (oldval != newval
   && atomic_compare_and_exchange_bool_acq (futex, newval, oldval))
 continue;

      lll_futex_wait (futex, newval, private);

五、部分实现细节

1、robust_list和futex的关系

内核使用的robustlist和futex本身是不同的,内核使用用户注册的robust_list_head结构来查找出每个list对应的futex的位置,这个为用户提供了一些自由度,从而可以将每个futex涉及在一个结构中。这一点在上面的

exit_robust_list

   if (handle_futex_death((void __user *)entry + futex_offset,
      curr, pi))

可以得到验证。
/*
 * Per-thread list head:
 *
 * NOTE: this structure is part of the syscall ABI, and must only be
 * changed if the change is first communicated with the glibc folks.
 * (When an incompatible change is done, we'll increase the structure
 *  size, which glibc will detect)
 */
struct robust_list_head {
 /*
  * The head of the list. Points back to itself if empty:
  */
 struct robust_list list;

 /*
  * This relative offset is set by user-space, it gives the kernel
  * the relative position of the futex field to examine. This way
  * we keep userspace flexible, to freely shape its data-structure,
  * without hardcoding any particular offset into the kernel:
  */
 long futex_offset;

 /*
  * The death of the thread may race with userspace setting
  * up a lock's links. So to handle this race, userspace first
  * sets this field to the address of the to-be-taken lock,
  * then does the lock acquire, and then adds itself to the
  * list, and then clears this field. Hence the kernel will
  * always have full knowledge of all locks that the thread
  * _might_ have taken. We check the owner TID in any case,
  * so only truly owned locks will be handled.
  */
 struct robust_list __user *list_op_pending;
};
2、futex地址必须4字节对齐

static inline int fetch_robust_entry(struct robust_list __user **entry,
         struct robust_list __user * __user *head,
         int *pi)
{

 *pi = uentry & 1;

static void get_futex_key_refs(union futex_key *key)
{
 if (!key->both.ptr)
  return;

 switch (key->both.offset & (FUT_OFF_INODE|FUT_OFF_MMSHARED)) {
 case FUT_OFF_INODE:
  atomic_inc(&key->shared.inode->i_count);
  break;
 case FUT_OFF_MMSHARED:
  atomic_inc(&key->private.mm->mm_count);
  break;
 }
}

对于进程间共享的futex,使用的是物理页面,而对于进程内的线程共享,使用真正的地址就可以了,因为这个结构是进程内唯一的

  评论这张
 
阅读(2042)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017