文章目录
  1. 1. condition variables
  2. 2. std::future
  3. 3. 参考

前面介绍了线程间如何安全的共享数据,主要是通过互斥量和锁来实现。除了共享数据以外,线程间还需要进行同步的操作,比如说有某个事件需要通知。C++11引入了两个基本的机制来实现线程间的通信:condition variablesfutures

condition variables

condition variables主要是用来等待某个事件的通知,比如说线程1专门负责处理数据,线程2则是从IO读取数据放到队列里。这个时候线程1就需要等队列空闲的时候等待新的数据,而线程2则需要在数据放入队列以后通知线程1。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Thread 1

std::mutex mtx;
std::queue<data> data_queue;
std::condition_variable cond;

void process_data()
{
while (true)
{
std::unique_lock<mtx> lk;
cond.wait(lk, []{return !data_queue.empty();}); // 1
auto data = data_queue.front();
data_queue.pop();
lk.unlock(); // 2
process(data);
if (is_last_chunk(data))
break;
}
}

void prepare_data()
{
while(more_data_to_prepare())
{
auto data = prepare_data();
std::unique_lock<mtx> lk;
data_queue.push(data);
cond.notify_one(); // 3
}
}

这段代码有几个关键的知识点:

  • 为什么#1处condition_variable wait的时候需要传一个unique_lock
  • 为什么#1处,需要判断data_queue.empty
  • #1,除了wait以外还有没有其他方式?
  • 为什么#2处要显式unlock
  • #3 处是否还有其他notify的方式,并且有何区别?

为什么#1处condition_variable wait的时候需要传一个unique_lock

第一次看到condition_variable会比较奇怪为什么需要传入一个锁。这个是因为,condition_variable需要运行pred来检查是否满足等待条件,而这个往往是需要访问共享数据的,这个锁是用来保护这段共享数据的。

这个是其中一个原因,但是你会发现有一个wait函数是不需要传入predicate的。这个时候这个lock的意义则在于保护condition_variable本身内部的信号。因为多个线程会同时访问这个condition_variable,所以需要某种机制来对其内部的数据结构进行保护。[1]

为什么#1处,需要判断data_queue.empty

有一种wake up叫做spurious wakeup。简单地说就是,被notify了但是其实有可能它需要等待的那个条件已经不满足了,在这个例子中就是data_queue可能在它被唤醒的时候是empty。那么为什么会出现这种情况呢?

我们可以想一下condition_variable内部是怎么实现的,下面这段伪代码

1
2
3
4
5
6
7
void
condition_variable::wait(unique_lock<mutex> &lock)
{
lock.unlock();
// some system call to wait for event
lock.lock();
}

可以看到,wait的开始的时候应该会先把把mutex给释放掉,然后等待系统事件。当收到事件以后则会再次尝试去获取mutex。可以看到在收到系统事件和获取mutex这期间是没有任何mutex做为保护的。在这个时候很有可能外部的条件已经被另外的线程给修改掉了,所以之后再次获取到mutex并不能保证一定满足所需要等待的条件。

看了一下libcxx内部的实现,它其实是直接调用了pthread的pthread_cond_wait。而在pthread_cond_wait的内部实现则不是我上面说的那么简单,还有很多其他的操作。这个具体代码我也没有完全看明白,就不在这里详述,有机会另外再研究。(我看的是glibc里面pthread的实现)。

除了wait以外还有没有其他方式?

除了wait以外还提供了wait_untilwait_for两个api,主要是用来避免等待太久。内部实现的话wait_until其实是转换成了wait_for,所以一个比较有意思的事情是如果你在等待期间去修改系统的时间的话,对于最终等待的时间是不会产生影响的,它内部开始的时候已经换算成时间差了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::condition_variable cv;
std::mutex mtx;
std::unique_lock<std::mutex> lk(mtx);
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::cout << std::put_time(std::localtime(&now_c), "%T") << std::endl;
cv.wait_until(lk, now + std::chrono::seconds(10));
now = std::chrono::system_clock::now();
now_c = std::chrono::system_clock::to_time_t(now);
std::cout << std::put_time(std::localtime(&now_c), "%T") << std::endl;

// output:
20:57:19
20:52:27

为什么#2处要显式unlock

拿到数据以后,data_queue已经不需要保护,而且处理数据可能会比较耗时。没有必要在这个地方去一直拿着mutex。

#3 处是否还有其他notify的方式,并且有何区别?

可以选择notify_onenotify_all,顾名思义。两个的区别是唤醒的等待线程数量不一样。notify方法调用的时候是需要去获取mutex的。

std::future

一个异步方法如何返回一个结果,C/C++程序员的第一反应应该是用回调函数。回调函数确实可以解决这个问题,但是回调函数本身也有着诸多的问题。C++11中引入lambda和闭包,能够让这个问题有所缓解。但是使用回调的方法并非一个优雅的解决方法。

举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void CalcSomething()
{
std::mutex mtx;
std::condition_variable cv;

int result = 0;
doSomethingAsync(11, [&result, &cv](int r){
result = r;
cv.notify_one();
});
// Do other things
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk);
// use result to do more calculation
}

看着就蛋疼。那么future就是可以帮我们解决这个问题。有了它我们可以这么写代码。

1
2
3
4
5
6
7
8
void CalcSomething()
{
auto f = doSomethingAsync(11);
// Do other things
f.wait();
int result = f.get();
// use result to do more calculation
}

确实可以大大简化我们原来的代码,其实隐约看到另外一种编程的方法。但是在很多时候还是不够用,比如说CalcSomething也是一个异步方法,我想让结果等待以后继续做事情但不想同步的等。那么我们可以起一个线程去执行上述代码,比如

1
2
3
4
5
6
7
8
9
10
11
void CalcSomethingUseThread()
{
std::thread t([]{
auto f = doSomethingAsync(11);
// Do other things
f.wait();
int result = f.get();
// use result to do more calculation
});
t.detach();
}

但是很显然随随便便起一个线程只为做这么简单的事情未免开销太大了,而且很多项目里是有类似线程池来统一管理线程的。目前的future还不能够很好地处理这种需求,要么用独立线程运行,要么通过注册回调来做。

不过Folly::Futurestd::future做了极大的扩充,提供了类似的解决方案。有兴趣的可以去了解一下。

实际运用中C++11中的std::future有点鸡肋的感觉,除非在C++20以后能够加入更加多的功能,否则并没有很大的吸引力去用这套机制。

参考

  1. Stackoverflow - Why do pthreads’ condition variable functions require a mutex?
  2. Jon Skeet - Does C# Monitor.Wait() suffer from spurious wakeups?
文章目录
  1. 1. condition variables
  2. 2. std::future
  3. 3. 参考