Futex 简述

简介:futex 全称为Fast User-space Mutex,是Linux 2.5.7 内核引入的锁原语,不同于其他进程间通信IPC原语(如信号量Semaphore、信号Signal和各种锁pthread_mutex_lock),futex更轻量级、快速,一般应用开发人员可能很少用到,但可基于futex实现各类读写锁、屏障(barriers)和信号机制等。

相关背景

在Linux的早期版本(内核Linux 2.5.7 版本以前),进程间通信(Inter-Process Communication,IPC)沿用的是传统Unix系统和System V 的IPC,如信号量(Semaphores)和Socket 等,这些IPC 均基于系统调用(System Call)。这类方法的缺点是当系统竞争度较低时,每次都进行系统调用,会造成较大系统开销。

原理和做法

用户程序每次调用IPC机制都会产生系统调用,程序发生用户态和内核态的切换,futex 的基本思想是竞争态总是很少发生的,只有在竞争态才需要进入内核,否则在用户态即可完成。futex的两个目标是:1)尽量避免系统调用;2)避免不必要的上下文切换(导致的TLB失效等)。

具体而言,任务获取一个futex 将发起带锁的减指令,并验证数值结果值是否为0(加上了锁),如果成功则可继续执行程序,失败(为已经占用的锁继续加锁)则任务在内核被阻塞。为相同futex 变量的加锁的任务被阻塞后放在同一个队列,解锁任务通过减少变量(只有一个加锁且锁队列为空)或进入内核从锁队列唤醒任务。

注意:futex 在Linux 的内核实现为一个系统调用(SYS_futex),用户程序如果直接调用它肯定会进入内核态,它还需要和其他语句(如原子操作)配合使用,新手在未理解其futex 原理和并发控制机制时极易犯错,这也是为什么不推荐直接使用它的原因。

Futex 简化版(基于Linux2.5.7 版本)

本节介绍的futex 基于Linux 2.5.7内核版本,和最新的futex 在实现上原理基本相同,但实现更为简单(代码更少)。

Futex 本质上是希望实现一个用户态的高性能锁。锁(Lock)是实现进程同步的最基本方法,锁可以实现为多个任务共享的一块内存(由mmap() 和shmat() 分配 )。如果用一个ulock_t 类型代表用户锁,那么它至少包含一个状态字:

typedef struct ulock_t { 
    long status; 
} ulock_t;

锁至少包含两个状态(status),可以用整数表示:

status == 1    // 无锁
status != 1   // 有锁

这样,就可以定义两个基本操作usema_up() 和usema_down() ,实现解锁和加锁(注意下面代码是用户态的)。

static inline int usema_down(ulock_t *ulock) 
{ 
    if (!__ulock_down(ulock)) 
        return 0; 
    return futex_down(ulock);                 // system call
}

static inline int usema_up(ulock_t *ulock) 
{ 
    if (!__ulock_up(ulock)) 
        return 0; 
    return futex_up(ulock);                  // system call
}

其中,__ulock_down(ulock) 和__ulock_up(ulock) 是用户态的原子性操作(类似于atomic_compare_exchange_strong() 、__sync_fetch_and_add() 和__sync_add_and_fetch() 等),实现对ulock 的值的修改,如果不成功则通过系统调用futex_down() 或futex_up() 进入内核。内核中futex_down() 和futex_up() 基本功能如下

  • futex_down() :将status 减1(status 由1 变为0 ,代表程序其占有锁;如果status 由0 变为-1,代表程序等待)。
  • futex_up() :将status 设置为1(即解锁)。

注意,这里锁的状态只有1、0,-1 。在Linux 2.5.7 内核中,futex_donw() 和futex_up() 相关代码如下:

/* Try to decrement the user count to zero. */
static int decrement_to_zero(struct page *page, unsigned int offset)
{
	atomic_t *count;
	int ret = 0;

	count = kmap(page) + offset;
	/* If we take the semaphore from 1 to 0, it's ours.  If it's
           zero, decrement anyway, to indicate we are waiting.  If
           it's negative, don't decrement so we don't wrap... */
	if (atomic_read(count) >= 0 && atomic_dec_and_test(count))
		ret = 1;
	kunmap(page);
	return ret;
}

/* Simplified from arch/ppc/kernel/semaphore.c: Paul M. is a genius. */
static int futex_down(struct list_head *head, struct page *page, int offset)
{
	int retval = 0;
	struct futex_q q;

	current->state = TASK_INTERRUPTIBLE;
	queue_me(head, &q, page, offset);

	while (!decrement_to_zero(page, offset)) {
		if (signal_pending(current)) {
			retval = -EINTR;
			break;
		}
		schedule();
		current->state = TASK_INTERRUPTIBLE;
	}
	current->state = TASK_RUNNING;
	unqueue_me(&q);
	/* If we were signalled, we might have just been woken: we
	   must wake another one.  Otherwise we need to wake someone
	   else (if they are waiting) so they drop the count below 0,
	   and when we "up" in userspace, we know there is a
	   waiter. */
	wake_one_waiter(head, page, offset);
	return retval;
}

static int futex_up(struct list_head *head, struct page *page, int offset)
{
	atomic_t *count;

	count = kmap(page) + offset;
	atomic_set(count, 1);
	smp_wmb();
	kunmap(page);
	wake_one_waiter(head, page, offset);
	return 0;
}

注意,加锁(futex_down)时,如果是无锁状态(decrement_to_zero(page, offset) 返回 1),则成功加锁并继续运行,否则,程序进入调度点(即schedule(),进入调度点前将状态变更为TASK_INTERRUPTIBLE 表示该任务可被中断)。注意,一个锁被其他程序占有是不可预期的,因此试图占有该锁的任务不能一直轮询,而应该尽量被调度出。该任务即使被调度,再次进入时也会继续判断是否可占用该锁。

当然,你也可以重新定义另外一套规则,用数字0 表示未上锁,1 表示上锁,2 表示一个或多个锁的等待者(Ulrich Drepper 在Futexes Are Tricky 的文中正是这样定义的),这具体要看系统架构和CPU 支持的原子操作。

锁队列

所有对相同内存地址(page,offset)加锁的程序,进入内核后(futex_down)都被挂在同一个队列上。队列头是一个2^FUTEX_HASHBITS 大小的数组(futex_queues),内存地址(page,offset)作为key 哈希结果将映射至该数组上(因此也有可能会将不同内存地址映射到同一个队列上)。

锁队列(futex_queues)中元素是一个hash_bucket,每个hash_bucket 是某类(即哈希结果相同的)加锁队列的列表头,加锁队列每个元素由一个futex_q 数据结构表示,如下(很显然包含了任务的数据结构task_struct):

struct futex_q {
	struct list_head list;
	struct task_struct *task;
	/* Page struct and offset within it. */
	struct page *page;
	unsigned int offset;
};

以上基于Linux 2.5.7 的futex 实现还是略显简陋,且与最新内核中futex 的实现已经有非常大的区别了。以2.6.39 的内核为例,函数futex 定义如下

long futex(uint32_t *uaddr, int futex_op, uint32_t val,
                 const struct timespec *timeout,   /* or: uint32_t val2 */
                 uint32_t *uaddr2, uint32_t val3);

下面将逐步过渡至新内核中futex() 的用法。

futex_wait 和futex_wake

总地来说,futex 包含两种基本操作(futex_op):futex_wait futex_wake

futex_wait 判断保存在地址addr 的值是否等于val,如果等于,则将当前线程休眠,不等于则返回错误码EWOULDBLOCK。

futex_wake 则是唤醒在地址addr上数量为val个线程。

uaddr 和 val 是futex 最重要的两个变量,uaddr 是一个4 字节大小值(futex word)的地址,这个地址一般通过共享内存由多个进(线)程共享,每个进(线)程在进入futex()后将判断该地址的值和自己期望的值val (expected value)是否相同。

futex() 还有timeout,uaddr2 和val3 这三个变量用于配合不同futex_op 一起使用。

timeout 设置操作的超时时间,如果达到了超时时间futex_wait 还没有等到它的futex 则返回ETIMEOUT,uaddr2和val3 可以组成另一对futex word 和expected value。

futex_wake 比较简单,基本步骤为:

  1. 通过key 获取hash_bucket(即futex_queue 的每个元素);
  2. spin_lock该hash_bucket;
  3. 唤醒指定个数的futex_q;
  4. spin_unlock hash_bucket。

代码(Linux 2.6.39)如下:

/*
 * Wake up waiters matching bitset queued on this futex (uaddr).
 */
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
	struct futex_hash_bucket *hb;
	struct futex_q *this, *next;
	struct plist_head *head;
	union futex_key key = FUTEX_KEY_INIT;
	int ret;

	if (!bitset)
		return -EINVAL;

	ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key);
	if (unlikely(ret != 0))
		goto out;

	hb = hash_futex(&key);
	spin_lock(&hb->lock);
	head = &hb->chain;

	plist_for_each_entry_safe(this, next, head, list) {
		if (match_futex (&this->key, &key)) {
			if (this->pi_state || this->rt_waiter) {
				ret = -EINVAL;
				break;
			}

			/* Check if one of the bits is set in both bitsets */
			if (!(this->bitset & bitset))
				continue;

			wake_futex(this);
			if (++ret >= nr_wake)
				break;
		}
	}

	spin_unlock(&hb->lock);
	put_futex_key(&key);
out:
	return ret;
}

futex_wait 稍微复杂点,代码如下

static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
		      ktime_t *abs_time, u32 bitset)
{
	struct hrtimer_sleeper timeout, *to = NULL;
	struct restart_block *restart;
	struct futex_hash_bucket *hb;
	struct futex_q q = futex_q_init;
	int ret;

	if (!bitset)
		return -EINVAL;
	q.bitset = bitset;

	if (abs_time) {
		to = &timeout;

		hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
				      CLOCK_REALTIME : CLOCK_MONOTONIC,
				      HRTIMER_MODE_ABS);
		hrtimer_init_sleeper(to, current);
		hrtimer_set_expires_range_ns(&to->timer, *abs_time,
					     current->timer_slack_ns);
	}

retry:
	/*
	 * Prepare to wait on uaddr. On success, holds hb lock and increments
	 * q.key refs.
	 */
	ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
	if (ret)
		goto out;

	/* queue_me and wait for wakeup, timeout, or a signal. */
	futex_wait_queue_me(hb, &q, to);

	/* If we were woken (and unqueued), we succeeded, whatever. */
	ret = 0;
	/* unqueue_me() drops q.key ref */
	if (!unqueue_me(&q))
		goto out;
	ret = -ETIMEDOUT;
	if (to && !to->task)
		goto out;

	/*
	 * We expect signal_pending(current), but we might be the
	 * victim of a spurious wakeup as well.
	 */
	if (!signal_pending(current))
		goto retry;

	ret = -ERESTARTSYS;
	if (!abs_time)
		goto out;

	restart = &current_thread_info()->restart_block;
	restart->fn = futex_wait_restart;
	restart->futex.uaddr = uaddr;
	restart->futex.val = val;
	restart->futex.time = abs_time->tv64;
	restart->futex.bitset = bitset;
	restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

	ret = -ERESTART_RESTARTBLOCK;

out:
	if (to) {
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}
	return ret;
}

bitset 是一个位掩码,用于筛选需要唤醒(加锁)队列的任务;

hrtimer 是高精度时钟相关的,包括初始化定时器,设置时钟失效时间等。

最重要的语句莫过于futex_wait_setup() 和futex_wait_queue_me()

	/*
	 * Prepare to wait on uaddr. On success, holds hb lock and increments
	 * q.key refs.
	 */
	ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
	if (ret)
		goto out;
	/* queue_me and wait for wakeup, timeout, or a signal. */
	futex_wait_queue_me(hb, &q, to);
	/* If we were woken (and unqueued), we succeeded, whatever. */
	ret = 0;
	/* unqueue_me() drops q.key ref */
	if (!unqueue_me(&q))
		goto out;
    /* blah blah blah, other codes */
out:
	return ret;

futex_wait_setup() 做futex_wait 的准备工作,包括

  • 获取futex key(get_futex_key() 针对进程内和进程间共享内存采用不同生成futex key方法:进程内由uaddr 和current->mm计算得到,进程间由age->index, vma->vm_file->f_path.dentry->d_inode 和offset_within_page 计算得到);
	ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key);
	if (unlikely(ret != 0))
		return ret;
  • 根据futex key 计算得到hash_bucket(*hb = queue_lock(q);)并对hash_bucket 加自旋锁;
static inline struct futex_hash_bucket *queue_lock(struct futex_q *q)
	__acquires(&hb->lock)
{
	struct futex_hash_bucket *hb;

	hb = hash_futex(&q->key);
	q->lock_ptr = &hb->lock;

	spin_lock(&hb->lock);
	return hb;
}
  • 如果从uaddr 获取的值和参数val 不一致,则解自旋锁
ret = get_futex_value_locked(&uval, uaddr);
if (uval != val) {
		queue_unlock(q, *hb);
		ret = -EWOULDBLOCK;
}

futex_wait_setup() 中对内存块引用的修改相关函数命名为get_futex_key_refs() 和drop_futex_key_refs()。

futex_wait_queue_me() 将任务放入队列等待唤醒,代码如下:

/**
 * futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
 * @hb:		the futex hash bucket, must be locked by the caller
 * @q:		the futex_q to queue up on
 * @timeout:	the prepared hrtimer_sleeper, or null for no timeout
 */
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
				struct hrtimer_sleeper *timeout)
{
	/*
	 * The task state is guaranteed to be set before another task can
	 * wake it. set_current_state() is implemented using set_mb() and
	 * queue_me() calls spin_unlock() upon completion, both serializing
	 * access to the hash list and forcing another memory barrier.
	 */
	set_current_state(TASK_INTERRUPTIBLE);
	queue_me(q, hb);

	/* Arm the timer */
	if (timeout) {
		hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
		if (!hrtimer_active(&timeout->timer))
			timeout->task = NULL;
	}

	/*
	 * If we have been removed from the hash list, then another task
	 * has tried to wake us, and we can skip the call to schedule().
	 */
	if (likely(!plist_node_empty(&q->list))) {
		/*
		 * If the timer has already expired, current will already be
		 * flagged for rescheduling. Only call schedule if there
		 * is no timeout, or if it has yet to expire.
		 */
		if (!timeout || timeout->task)
			schedule();
	}
	__set_current_state(TASK_RUNNING);
}

一旦将该任务对应的futex_q 放入队列中(queue_me(q, hb);),将解相应hash_bucket 的自旋锁。

如果用户设置了超时时间(if (timeout)),则启动定时时钟。

接下来,如果任务没有从锁队列中移除(!plist_node_empty(&q->list)),或无超时(!timeout || timeout->task),则调度该任务(schedule();)。

从futex_wait_queue_me() 后,任务将被调度出CPU,(理想情况下)在下面两种情况再次进入:1)被futex_wake 任务唤醒;2)超时唤醒。

Futex 基本方法

本节介绍用户态如何使用futex 系统调用,在这之前需要重复几点:

  1. futex_wake 唤醒任务数量一般为1 或INT_MAX,即每次唤醒一个或所有指定任务;
  2. futex 本身就是系统调用,需要配合用户态的原子指令(原子操作)一起使用;
  3. 用于标识是否上锁和futex 变量锁长度的变量大小应该是有限长度,防止变量溢出。(本文默认1 为未上锁,0 为上锁,-1为有一个或多个任务在锁队列等待。)

futex 文档中介绍了一种基本用法

#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); } while (0)

static uint32_t *futex1, *futex2, *iaddr;

static int
futex(uint32_t *uaddr, int futex_op, uint32_t val,
	 const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3)
{
   return syscall(SYS_futex, uaddr, futex_op, val,
				  timeout, uaddr2, val3);
}

/* Acquire the futex pointed to by 'futexp': wait for its value to
  become 1, and then set the value to 0. */
static void
fwait(uint32_t *futexp)
{
   long s;

   /* atomic_compare_exchange_strong(ptr, oldval, newval)
	  atomically performs the equivalent of:

    if (*ptr == *oldval)
	    *ptr = newval;

	  It returns true if the test yielded true and *ptr was updated. */

   while (1) {
	   /* Is the futex available? */
	   const uint32_t one = 1;
	   if (atomic_compare_exchange_strong(futexp, &one, 0))
		   break;      /* Yes */

	   /* Futex is not available; wait. */
	   s = futex(futexp, FUTEX_WAIT, 0, NULL, NULL, 0);
	   if (s == -1 && errno != EAGAIN)
		   errExit("futex-FUTEX_WAIT");
   }
}

/* Release the futex pointed to by 'futexp': if the futex currently
  has the value 0, set its value to 1 and the wake any futex waiters,
  so that if the peer is blocked in fwait(), it can proceed. */
static void
fpost(uint32_t *futexp)
{
   long s;

   const uint32_t zero = 0;
   if (atomic_compare_exchange_strong(futexp, &zero, 1)) {
	   s = futex(futexp, FUTEX_WAKE, 1, NULL, NULL, 0);
	   if (s  == -1)
		   errExit("futex-FUTEX_WAKE");
   }
}

fwait 尝试在用户态将futex 变量(*futexp)由1 变为0,如果futex 变量已经是0,则进入调用futex_wait内核态。

fpost尝试在用户态将fuex 变量(*futexp)由0 变为1,如果futex 变量已经是0,则进入调用futex_wake内核态,唤醒内核中的futex waiters。

atomic_compare_exchange_strong 是C11 标准定义的原子指令:

_Bool atomic_compare_exchange_strong(volatile A *object, C *expected, C desired);

类型A 是原子类型数据,类型C 是非原子类型,该指令将object 对应内存的数据和expected 值比较,如果相等,则替换为desired;如果不等,将object 对应内存的数值放在expected 中。执行成功与否的结果作为返回值。

类似于atomic_compare_exchange_strong 的指令还有atomic_compare_exchange_weak,前者可视为后者在一个while 循环中,只有返回为真才跳出while。

desired = ...;
expected = current;
while (current.atomic_compare_exchange_weak(expected, desired))
  ;

atomic_compare_exchange_* 系列指令的行为类似于下面语句:

if (memcmp(obj, expected, sizeof *obj) == 0) {
    memcpy(obj, &desired, sizeof *obj);
    return true;
} else {
    memcpy(expected, obj, sizeof *obj);
    return false;
}

上面关于fpost 和fwait 的实现在最好情况下(无竞争)仅有一个原子操作,在最差情况下也只有一次原子操作加一次系统调用。

参考文献

https://man7.org/linux/man-pages/man2/futex.2.html Futex(2) Documentation

Fuss, Futexes and Furwocks: Fast Userlevel Locking in Linux by Hubertus Franke , Rusty Russell , Matthew Kirkwood

https://lwn.net/Articles/685769/ In pursuit of faster futexes

https://lwn.net/Articles/360699/ A futex overview and update

Futex Are Tricky by Ulrich Drepper (注意该文需辩证地看,1 该文假设问题的前提可能已不存在,2 文章所使用的原子操作和锁变量大小的定义和本文不同)

附件1 Linux 2.5.7 中kernel/futex.c 源码

/*
 *  Fast Userspace Mutexes (which I call "Futexes!").
 *  (C) Rusty Russell, IBM 2002
 *
 *  Thanks to Ben LaHaise for yelling "hashed waitqueues" loudly
 *  enough at me, Linus for the original (flawed) idea, Matthew
 *  Kirkwood for proof-of-concept implementation.
 *
 *  "The futexes are also cursed."
 *  "But they come in a choice of three flavours!"
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
#include <linux/kernel.h>
#include <linux/spinlock.h>
#include <linux/sched.h>
#include <linux/mm.h>
#include <linux/hash.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/futex.h>
#include <linux/highmem.h>
#include <asm/atomic.h>

/* These mutexes are a very simple counter: the winner is the one who
   decrements from 1 to 0.  The counter starts at 1 when the lock is
   free.  A value other than 0 or 1 means someone may be sleeping.
   This is simple enough to work on all architectures, but has the
   problem that if we never "up" the semaphore it could eventually
   wrap around. */

/* FIXME: This may be way too small. --RR */
#define FUTEX_HASHBITS 6

/* We use this instead of a normal wait_queue_t, so we can wake only
   the relevent ones (hashed queues may be shared) */
struct futex_q {
	struct list_head list;
	struct task_struct *task;
	/* Page struct and offset within it. */
	struct page *page;
	unsigned int offset;
};

/* The key for the hash is the address + index + offset within page */
static struct list_head futex_queues[1<<FUTEX_HASHBITS];
static spinlock_t futex_lock = SPIN_LOCK_UNLOCKED;

static inline struct list_head *hash_futex(struct page *page,
					   unsigned long offset)
{
	unsigned long h;

	/* struct page is shared, so we can hash on its address */
	h = (unsigned long)page + offset;
	return &futex_queues[hash_long(h, FUTEX_HASHBITS)];
}

static inline void wake_one_waiter(struct list_head *head,
				   struct page *page,
				   unsigned int offset)
{
	struct list_head *i;

	spin_lock(&futex_lock);
	list_for_each(i, head) {
		struct futex_q *this = list_entry(i, struct futex_q, list);

		if (this->page == page && this->offset == offset) {
			wake_up_process(this->task);
			break;
		}
	}
	spin_unlock(&futex_lock);
}

/* Add at end to avoid starvation */
static inline void queue_me(struct list_head *head,
			    struct futex_q *q,
			    struct page *page,
			    unsigned int offset)
{
	q->task = current;
	q->page = page;
	q->offset = offset;

	spin_lock(&futex_lock);
	list_add_tail(&q->list, head);
	spin_unlock(&futex_lock);
}

static inline void unqueue_me(struct futex_q *q)
{
	spin_lock(&futex_lock);
	list_del(&q->list);
	spin_unlock(&futex_lock);
}

/* Get kernel address of the user page and pin it. */
static struct page *pin_page(unsigned long page_start)
{
	struct mm_struct *mm = current->mm;
	struct page *page;
	int err;

	down_read(&mm->mmap_sem);
	err = get_user_pages(current, current->mm, page_start,
			     1 /* one page */,
			     1 /* writable */,
			     0 /* don't force */,
			     &page,
			     NULL /* don't return vmas */);
	up_read(&mm->mmap_sem);

	if (err < 0)
		return ERR_PTR(err);
	return page;
}

/* Try to decrement the user count to zero. */
static int decrement_to_zero(struct page *page, unsigned int offset)
{
	atomic_t *count;
	int ret = 0;

	count = kmap(page) + offset;
	/* If we take the semaphore from 1 to 0, it's ours.  If it's
           zero, decrement anyway, to indicate we are waiting.  If
           it's negative, don't decrement so we don't wrap... */
	if (atomic_read(count) >= 0 && atomic_dec_and_test(count))
		ret = 1;
	kunmap(page);
	return ret;
}

/* Simplified from arch/ppc/kernel/semaphore.c: Paul M. is a genius. */
static int futex_down(struct list_head *head, struct page *page, int offset)
{
	int retval = 0;
	struct futex_q q;

	current->state = TASK_INTERRUPTIBLE;
	queue_me(head, &q, page, offset);

	while (!decrement_to_zero(page, offset)) {
		if (signal_pending(current)) {
			retval = -EINTR;
			break;
		}
		schedule();
		current->state = TASK_INTERRUPTIBLE;
	}
	current->state = TASK_RUNNING;
	unqueue_me(&q);
	/* If we were signalled, we might have just been woken: we
	   must wake another one.  Otherwise we need to wake someone
	   else (if they are waiting) so they drop the count below 0,
	   and when we "up" in userspace, we know there is a
	   waiter. */
	wake_one_waiter(head, page, offset);
	return retval;
}

static int futex_up(struct list_head *head, struct page *page, int offset)
{
	atomic_t *count;

	count = kmap(page) + offset;
	atomic_set(count, 1);
	smp_wmb();
	kunmap(page);
	wake_one_waiter(head, page, offset);
	return 0;
}

asmlinkage int sys_futex(void *uaddr, int op)
{
	int ret;
	unsigned long pos_in_page;
	struct list_head *head;
	struct page *page;

	pos_in_page = ((unsigned long)uaddr) % PAGE_SIZE;

	/* Must be "naturally" aligned, and not on page boundary. */
	if ((pos_in_page % __alignof__(atomic_t)) != 0
	    || pos_in_page + sizeof(atomic_t) > PAGE_SIZE)
		return -EINVAL;

	/* Simpler if it doesn't vanish underneath us. */
	page = pin_page((unsigned long)uaddr - pos_in_page);
	if (IS_ERR(page))
		return PTR_ERR(page);

	head = hash_futex(page, pos_in_page);
	switch (op) {
	case FUTEX_UP:
		ret = futex_up(head, page, pos_in_page);
		break;
	case FUTEX_DOWN:
		ret = futex_down(head, page, pos_in_page);
		break;
	/* Add other lock types here... */
	default:
		ret = -EINVAL;
	}
	put_page(page);

	return ret;
}

static int __init init(void)
{
	unsigned int i;

	for (i = 0; i < ARRAY_SIZE(futex_queues); i++)
		INIT_LIST_HEAD(&futex_queues[i]);
	return 0;
}
__initcall(init);

附件2 为什么要使用使用原子操作

下面代码给出了多线程中需要采用原子操作的例子:多个线程(periodic_task)同时修改一个变量val,如果采用普通加一的方法

val = val + 1;

由于多线程对数值的修改可能被覆盖,最终结果一般小于期望值,只有使用了原子操作(__sync_add_and_fetch)才能保证变量val 的一致性。值得注意的是,原子操作带来的开销也是不可忽视的。

#define _GNU_SOURCE
#include <stddef.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>                   /* for threads */
#include <semaphore.h>                 /* sem_post sem_wait */
#include <assert.h>                    /* assert() */
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>

#define NUM_THREAD_MAX 3200

sem_t start_sem[NUM_THREAD_MAX];

void * periodic_task(void* arg);

volatile int val;

void * periodic_task(void* arg)
{
    int tid = *((int *)arg);

    while (1) {
        assert(sem_wait(&start_sem[tid]) == 0);
        if (tid%4 != 0) {
            val = val + 1;
        } else {
            __sync_add_and_fetch(&val, 1);        
        }
    }
}

void timer_addus(struct timespec *tm, int us) 
{
    tm->tv_nsec += us*1000;
    if (tm->tv_nsec > 1000000000L) {
        tm->tv_nsec -= 1000000000L;
        tm->tv_sec += 1;
    }
}

int main(int argc, const char *argv[])
{
    struct timespec timer1;
    struct sched_param sp;
    pthread_t periodic_task_thread[NUM_THREAD_MAX];
    pthread_attr_t attr;
    int ii, jj;
    int threadID[NUM_THREAD_MAX];

    /*  input (1) num_threads  (2) step_size(us)  */
    if (argc != 3) {
        printf("%s  num_threads  step_size(us)\n", argv[0]);
        exit(0);
    }
    int num_thread = atoi(argv[1]);
    if (num_thread >= NUM_THREAD_MAX) {
        printf("num_thread is greater than NUM_THREAD_MAX(%d)\n", NUM_THREAD_MAX);
        exit(0);
    }
    int step_ms = atoi(argv[2]);

    val = 0;

    /* init semaphores */
    for(ii=0; ii<num_thread; ++ii) 
        assert(sem_init(&start_sem[ii],0,0) == 0);
    pthread_attr_init(&attr);
    
    /*  create threads */
    for (ii=0; ii<num_thread; ++ii) {
        threadID[ii] = ii;
        assert(pthread_create(&periodic_task_thread[ii], &attr, periodic_task, \
                    (void *)(&threadID[ii])) == 0);
    }
    
    clock_gettime(CLOCK_MONOTONIC, &timer1);
    timer_addus(&timer1, 100000000);    // delay 100ms to start

    printf("val=%d\n", val);
    int loops = 10000;
    for (ii=0; ii<loops; ++ii) {
        for (jj=0; jj<num_thread; ++jj) 
            assert(sem_post(&start_sem[jj]) == 0);
        timer_addus(&timer1, step_ms);
        clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &timer1, NULL);
    }

    printf("Initial 0, After %d loops\n", loops);
    printf("Expected %d\n", num_thread*loops);
    printf("Real value: %d\n", val);

    return 0;
}

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据