@ltlovezh
2020-02-15T17:29:43.000000Z
字数 16568
阅读 1256
C++
C++ 11增加了标准线程库:std::thread
,在语言级别上提供了线程支持,并且是跨平台的。在不同操作系统上,依赖于平台本身的线程库,例如Linux上,底层实现是pthread
库。
std::thread禁止了拷贝构造函数和拷贝赋值运算符,所以std::thread对象不能拷贝,但是可以移动。
一个最简单的实例:
void func(int arg1, int arg2) {
cout << "arg1: " << arg1 << ", arg2: " << arg2 << endl;
cout << "child thread id: " << std::this_thread::get_id() << endl;
}
int main() {
// 创建并启动子线程
std::thread thread(func, 10, 100);
cout << "child thread id: " << thread.get_id() << endl;
thread.join();
cout << "main thread id: " << std::this_thread::get_id() << endl;
cout << "main thread exit" << endl;
return 0;
}
// 输出
child thread id: 0x70000e841000
arg1: 10, arg2: 100
child thread id: 0x70000e841000
main thread id: 0x10ef98dc0
main thread exit
std::thread线程对象支持的操作如下所示:
std::thread
实现和操作系统相关,因此该函数返回std::thread
底层实现的线程句柄,例如:在Posix标准平台下,就表示Pthread句柄pthread_t
。std::this_thread表示当前线程,this_thread
实际是一个namespace
,支持如下操作:
下面通过sleep_for
和sleep_until
分别实现休眠10秒:
// 当前时间戳
std::time_t timestamp = std::chrono::system_clock::to_time_t(system_clock::now());
// sleep_until实现休眠10秒
std::this_thread::sleep_until(system_clock::from_time_t(timestamp + 10));
// sleep_for实现休眠10秒
std::this_thread::sleep_for(std::chrono::seconds(10));
关于时间的操作可参考time.h文件
// 当前时间戳(秒)
std::time_t now_timestamp = system_clock::to_time_t(system_clock::now());
// struct tm结构体,包含了时、分、秒
struct std::tm *now_tm = std::localtime(&now_timestamp);
std::cout << "Current time: " << now_tm->tm_hour << ":" << now_tm->tm_min << ":"<< now_tm->tm_sec << '\n';
// 未来一分钟
++now_tm->tm_min;
// 未来时间戳
std::time_t future_timestamp = std::mktime(now_tm);
std::cout << "Future time: " << now_tm->tm_hour << ":" << now_tm->tm_min << ":"<< now_tm->tm_sec << '\n';
// time_point格式,可用于std::this_thread::sleep_until
std::chrono::time_point<system_clock> x = system_clock::from_time_t(future_timestamp);
// 输出
Current time: 19:3:22
Future time: 19:4:22
通过线程锁,可以实现临界区访问,同一个时刻只有一个线程可以访问临界区。
std::mutex表示互斥锁,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符)。不支持递归锁定,若有此需求,可使用std::recursive_mutex
代替。
支持的操作:
recursive_mutex
代替,它允许同一个线程多次锁定同一个recursive_mutex
。recursive_mutex
代替,它允许同一个线程多次锁定同一个recursive_mutex
。lock和try_lock的差异主要是不能锁定mutex时表现不同,lock函数会一直阻塞调用线程,直到可以锁定mutex为止;而try_lock则不会阻塞调用线程,而是直接返回,并且返回值为false。
recursive_mutex
在mutex
的基础上,允许同一个线程对同一个recursive_mutex
多次加锁,表示获得recursive_mutex
的多层所有权,同时对recursive_mutex
解锁时,也要调用相同次数的unlock
,这样调用线程才能彻底释放对recursive_mutex
的所有权,其他线程才能锁定recursive_mutex
。
timed_mutex
在mutex
基础上,增加了两个成员函数try_lock_for
和try_lock_until
,表示等待一段时间,尝试获得锁。
recursive_timed_mutex
代替,它允许同一个线程多次锁定同一个recursive_timed_mutex
。try_lock_for
一致,只不过参数是绝对时间。
std::timed_mutex timed_mutex;
// 尝试2秒内获得锁
if (timed_mutex.try_lock_for(std::chrono::seconds(2))) {
// do some thing
// 释放锁
timed_mutex.unlock();
}
// 当前绝对时间戳
std::time_t timestamp = std::chrono::system_clock::to_time_t(system_clock::now());
// 尝试阻塞到未来的绝对时间获得锁
if (timed_mutex.try_lock_until(system_clock::from_time_t(timestamp + 10))) {
// do some thing
// 释放锁
timed_mutex.unlock();
}
可重入的时间锁,同时具备timed_mutex
和recursive_mutex
的能力,不再赘述。
针对具备不同能力的锁,C++划分了三种层级:
lock_guard
是一个管理mutex
的对象,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符),除了缺省合成的函数外,没有其他成员函数。构造lock_guard
时,mutex
被调用线程锁定,销毁lock_guard
时,mutex
被调用线程解锁。通过这种方式,可以确保程序抛出异常时,也能正确地解锁mutex
。这里的mutex
可以是四种锁中的任意一种。
lock_guard
不会介入mutex
生命周期,程序必须保证mutex
的生命周期至少延长到持有它的lock_guard
销毁为止。
简单来说,构造
lock_guard
时,获得锁;析构lock_guard
时,释放锁。
除了单参数构造函数,lock_guard
还有一个包含两个参数的构造函数:
lock_guard(mutex_type& __m, adopt_lock_t)
,表示创建lock_guard
时,构造函数不会对mutex
加锁,而是由外部程序保证mutex
已经被加锁了。
下面是一个典型案例:
// 互斥锁
std::mutex mtx;
void print_even(int x) {
if (x % 2 == 0) std::cout << x << " is even\n";
else throw (std::logic_error("not even"));
}
void print_thread_id(int id) {
try {
// 使用lock_guard锁定mtx,保证即使是异常逻辑,lock_guard也可以在析构时解锁mtx
std::lock_guard<std::mutex> lck(mtx);
print_even(id);
}
catch (std::logic_error &) {
std::cout << "[exception caught]\n";
}
}
int main() {
std::thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(print_thread_id, i + 1);
for (auto &th : threads) th.join();
return 0;
}
// 可能的输出
[exception caught]
6 is even
4 is even
[exception caught]
[exception caught]
2 is even
[exception caught]
8 is even
[exception caught]
10 is even
unique_lock
是一个管理mutex
的对象,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符),在lock_guard
基础上,增加了lock
、try_lock
、try_lock_for
、try_lock_until
和unlock
等成员函数(这些函数的作用在上面👆已经介绍过了),更加灵活,但相应的性能会受一些影响。
下面是源码中的类定义:
template <class _Mutex>
class unique_lock
{
public:
// 模板参数,各类锁
typedef _Mutex mutex_type;
private:
// 锁
mutex_type* __m_;
// 是否已经获得锁
bool __owns_;
public:
// 无参构造函数
unique_lock() : __m_(nullptr), __owns_(false) {}
// 单参数构造函数,与lock_guard一样,构造函数中加锁
explicit unique_lock(mutex_type& __m)
: __m_(_VSTD::addressof(__m)), __owns_(true) {__m_->lock();}
// 带defer_lock_t的构造函数(第二个参数可使用编译时常量std::defer_lock),构造函数中不加锁
unique_lock(mutex_type& __m, defer_lock_t) _NOEXCEPT
: __m_(_VSTD::addressof(__m)), __owns_(false) {}
// 带try_to_lock_t的构造函数(第二个参数可使用编译时常量std::try_to_lock),构造函数中尝试加锁
unique_lock(mutex_type& __m, try_to_lock_t)
: __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock()) {}
// 带adopt_lock_t的构造函数(第二个参数可使用编译时常量std::adopt_lock),与lock_guard一样,构造函数中不加锁,而是默认外部程序已经加锁了
unique_lock(mutex_type& __m, adopt_lock_t)
: __m_(_VSTD::addressof(__m)), __owns_(true) {}
template <class _Clock, class _Duration>
// 构造函数中通过try_lock_until尝试加锁
unique_lock(mutex_type& __m, const chrono::time_point<_Clock, _Duration>& __t)
: __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock_until(__t)) {}
template <class _Rep, class _Period>
// 构造函数中通过try_lock_for尝试加锁
unique_lock(mutex_type& __m, const chrono::duration<_Rep, _Period>& __d)
: __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock_for(__d)) {}
// 析构时,若加锁了,则释放锁
~unique_lock()
{
if (__owns_)
__m_->unlock();
}
private:
// 相当于删除了拷贝构造函数和拷贝赋值操作符
unique_lock(unique_lock const&); // = delete;
unique_lock& operator=(unique_lock const&); // = delete;
public:
#ifndef _LIBCPP_CXX03_LANG
// 移动构造函数
unique_lock(unique_lock&& __u) _NOEXCEPT
: __m_(__u.__m_), __owns_(__u.__owns_)
{__u.__m_ = nullptr; __u.__owns_ = false;}
// 移动赋值操作符
unique_lock& operator=(unique_lock&& __u) _NOEXCEPT
{
if (__owns_)
__m_->unlock();
__m_ = __u.__m_;
__owns_ = __u.__owns_;
__u.__m_ = nullptr;
__u.__owns_ = false;
return *this;
}
#endif // _LIBCPP_CXX03_LANG
// 主动加锁
void lock();
// 尝试加锁
bool try_lock();
template <class _Rep, class _Period>
bool try_lock_for(const chrono::duration<_Rep, _Period>& __d);
template <class _Clock, class _Duration>
bool try_lock_until(const chrono::time_point<_Clock, _Duration>& __t);
// 释放锁
void unlock();
// swap实现
void swap(unique_lock& __u) _NOEXCEPT
{
_VSTD::swap(__m_, __u.__m_);
_VSTD::swap(__owns_, __u.__owns_);
}
// 返回持有的mutex,但是该函数不会解锁mutex
mutex_type* release() _NOEXCEPT
{
mutex_type* __m = __m_;
__m_ = nullptr;
__owns_ = false;
return __m;
}
// 判断是否已经加锁了
bool owns_lock() const _NOEXCEPT {return __owns_;}
// 重载了函数调用符, 判断是否已经加锁了
operator bool () const _NOEXCEPT {return __owns_;}
// 获得持有的mutex
mutex_type* mutex() const _NOEXCEPT {return __m_;}
};
unique_lock
的构造函数,实现了不同的加锁策略,具体可见上面的源码和注释。
除非必要,优先使用更高效的lock_guard。
call_once
是全局函数模板,原型如下所示:
template <class Fn, class... Args>
void call_once (once_flag& flag, Fn&& fn, Args&&... args);
call_once
使用参数args调用fn函数,除非另一个线程已经(或者正在)使用相同的once_flag
调用call_once
。
第一个使用相同once_flag
调用call_once
的线程,会执行fn函数,同时使其他使用相同once_flag
调用call_once
的线程进入被动执行状态,即:其他线程不执行fn函数,但是会阻塞到第一个执行fn函数的线程结束。
如果通过call_once
执行fn函数的线程抛出了异常,并且存在被动执行的线程,则会从其中选择一个线程使其执行fn函数。
如果已经有线程执行完了
call_once
,即fn函数返回了,那么当前所有被动执行的线程和将来对call_once的调用(使用相同once_flag
)也会立即返回,不会再次执行fn函数。
即多线程环境下,相同once_flag
的call_once
调用,只会执行一次fn函数。
下面看一个实际案例:
int winner;
void set_winner(int x) {
winner = x;
std::cout << "set_winner, x = " << x << std::endl;
}
// 多个线程共用一个once_flag
std::once_flag winner_flag;
// 线程函数
void wait_1000ms(int id) {
// 循环1000次,每次休眠1ms
for (int i = 0; i < 1000; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// 不同线程通过同一个once_flag,调用call_once,最终只会有一个线程执行对应的set_winner函数
std::call_once(winner_flag, set_winner, id);
}
int main() {
std::thread threads[10];
// 10个线程
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(wait_1000ms, i + 1);
std::cout << "waiting for the first among 10 threads to count 1000 ms...\n";
for (auto &th : threads) th.join();
std::cout << "winner thread: " << winner << '\n';
return 0;
}
// 可能的输出
waiting for the first among 10 threads to count 1000 ms...
set_winner, x = 8
winner thread: 8
可见,虽然运行了10个线程,但是最终只有一个线程执行了set_winner函数。
lock
是全局函数模板,原型如下所示:
template <class Mutex1, class Mutex2, class... Mutexes>
void lock (Mutex1& a, Mutex2& b, Mutexes&... cde);
函数会锁定所有的mutex
,必要时阻塞调用线程。
lock
函数以不确定的顺序调用所有mutex
的成员函数:lock、try_lock和unlock,以确保函数返回时,所有mutex
都被锁定了(而不会产生任何死锁)。
如果lock
函数不能锁定所有mutex
(例如:其中一个调用抛出了异常),那么在函数失败之前,首先会解锁它成功锁定的所有mutex
。
下面👇是同时锁定foo和bar的案例:
std::mutex foo, bar;
void task_a() {
// 若使用foo.lock(); bar.lock(); 而不是std::lock,那么有可能task_a锁定了foo,而task_b锁定了bar,从而导致死锁
// foo.lock(); bar.lock();
std::lock(foo, bar);
std::cout << "task a\n";
foo.unlock();
bar.unlock();
}
void task_b() {
// 若使用bar.lock(); foo.lock(); 而不是std::lock,那么有可能task_a锁定了foo,而task_b锁定了bar,从而导致死锁
// bar.lock(); foo.lock();
std::lock(bar, foo);
std::cout << "task b\n";
bar.unlock();
foo.unlock();
}
int main() {
std::thread th1(task_a);
std::thread th2(task_b);
th1.join();
th2.join();
return 0;
}
try_lock
是全局函数模板,原型如下所示:
// int返回值:若成功锁定了所有mutex,则返回-1;否则返回加锁失败的mutex的索引值
template <class Mutex1, class Mutex2, class... Mutexes>
int try_lock (Mutex1& a, Mutex2& b, Mutexes&... cde);
函数尝试通过mutex.try_lock
锁定所有的mutex
。
try_lock
函数通过mutex.try_lock
成员函数依次为参数中的mutex
加锁(首先是a,然后是b,最后是cde),直到所有调用都成功,或者任意一个调用失败(即mutex.try_lock
返回false或抛出异常)。
如果try_lock
函数由于某个mutex
加锁失败而返回,则会解锁所有先前加锁成功的mutex
,并且返回那个加锁失败的mutex
的索引值。
std::mutex foo, bar;
void task_a() {
foo.lock();
std::cout << "task a\n";
bar.lock();
// ...
foo.unlock();
bar.unlock();
}
void task_b() {
int x = try_lock(bar, foo);
if (x == -1) {
std::cout << "task b\n";
// ...
bar.unlock();
foo.unlock();
} else {
std::cout << "[task b failed: mutex " << (x ? "foo" : "bar") << " locked]\n";
}
}
int main() {
std::thread th1(task_a);
std::thread th2(task_b);
th1.join();
th2.join();
return 0;
}
通过线程锁只能实现临界区访问,要实现线程之间的同步,需要借助条件变量。
condition_variable
是一个同步原语,能够阻塞调用线程,直到其他线程通知恢复为止,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符)。
当调用condition_variable
任意wait函数时,将使用unique_lock(通过mutex
)锁定线程。该线程将一直处于阻塞状态,直到另一个线程调用同一个condition_variable
任意notify函数唤醒为止。
condition_variable
对象总是使用std::unique_lock<mutex>
实现线程阻塞。
condition_variable
可用的wait函数:
wait有两个重载版本,如下所示:
void wait (unique_lock<mutex>& lck);
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);
表示阻塞调用线程(调用线程调用wait之前必须已经锁定了unique_lock持有的mutex),直到被其他线程唤醒。
该函数阻塞调用线程的时刻,会自动调用unique_lock.unlock
解锁mutex
,以允许其他线程加锁同一个mutex
继续运行。
一旦被其他线程唤醒,该函数会解除阻塞状态,并且调用unique_lock.lock
重新加锁(可能会再次阻塞调用线程),让unique_lock
恢复到wait函数被调用时的状态。
通常情况下,其他线程调用condition_variable
的notify_one或者notify_all成员函数,来唤醒被阻塞的线程。但是,某些实现可能会在不调用任何notify函数的情况下产生虚假唤醒。因此,使用该函数的程序应该确保其恢复条件得到了满足,所以一般在循环结构中调用wait函数,如下所示,这样即使被虚假唤醒,也会因为条件不满足,再次进入阻塞状态。
while(条件不满足){
condition_variable.wait
}
包含_Predicate __pred
参数的重载版本中,_Predicate是函数模板的参数,表示返回布尔值的函数。如果__pred返回false,则函数会一直阻塞,只有当它返回true时,notify才能唤醒线程。__pred会一直被调用,直到它返回true,非常适合处理虚假唤醒问题,如下所示:
template <class _Predicate>
void
condition_variable::wait(unique_lock<mutex>& __lk, _Predicate __pred)
{
// __pred()返回false,则一直调用wait阻塞
while (!__pred())
wait(__lk);
}
整体来看,wait类函数有三个注意点:
这样,wait类函数在被调用前和被唤醒返回后,可以确保是同一个线程状态。
下面👇看一个案例:
std::mutex mtx;
std::condition_variable cv;
int cargo = 0;
bool shipment_available() { return cargo != 0; }
void consume(int n) {
for (int i = 0; i < n; ++i) {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, shipment_available);
// consume:
std::cout << cargo << '\n';
cargo = 0;
}
}
int main() {
std::thread consumer_thread(consume, 10);
// produce 10 items when needed:
for (int i = 0; i < 10; ++i) {
while (shipment_available()) std::this_thread::yield();
std::unique_lock<std::mutex> lck(mtx);
cargo = i + 1;
cv.notify_one();
}
consumer_thread.join();
return 0;
}
// 输出
1
2
3
4
5
6
7
8
9
10
子线程因为cargo等于0,会进入阻塞状态,然后主线程修改cargo,并且唤醒子线程。
wait_for有两个重载版本,如下所示:
enum class cv_status{
no_timeout,
timeout
}
// 如果因为时间到了停止阻塞,则返回timeout,否则返回no_timeout
cv_status wait_for(unique_lock<mutex>& __lk, const chrono::duration<_Rep, _Period>& __d);
// 返回pred(),而不管是否触发了超时(尽管只有触发超时时,才能为false)。
bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred);
wait_for函数在wait基础上,增加了阻塞持续时间的能力,所以有两种方式被唤醒:
👇下面看一个实际案例:
std::condition_variable cv;
int value;
void read_value() {
std::cin >> value;
cv.notify_one();
}
int main() {
std::cout << "Please, enter an integer (I'll be printing dots): \n";
std::thread th(read_value);
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
while (cv.wait_for(lck, std::chrono::seconds(1)) == std::cv_status::timeout){
std::cout << '.' << std::endl;
}
std::cout << "You entered: " << value << '\n';
th.join();
return 0;
}
主线程每次阻塞1秒,若wait_for
是以超时结束,则打印.
,并且再次进入阻塞状态,直到被子线程主动唤醒退出while循环,结束主线程。
wait_until有两个重载版本,如下所示:
enum class cv_status{
no_timeout,
timeout
}
// 如果因为时间到了停止阻塞,则返回timeout,否则返回no_timeout
cv_status wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time);
// 返回pred(),而不管是否触发了超时(尽管只有触发超时时,才能为false)。
bool wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred);
wait_until函数在wait基础上,增加了阻塞到绝对时间的能力,所以有两种方式被唤醒:
解除当前正在等待指定condition_variable
的所有线程的阻塞状态。如果没有线程在等待,函数将什么也不做。
唤醒所有等待的线程
解除当前正在等待指定condition_variable
的所有线程中任意一个线程的阻塞状态。如果没有线程在等待,函数将什么也不做。
随机唤醒一个等待的线程
并不强制,可以根据具体情况,决定是否需要加锁
通知线程调用notify_all或者notify_one时,不需要提前加锁(与等待线程锁定的同一个mutex)。实际上,先加锁后通知是一种悲观做法,因为被通知的等待线程会立即再次阻塞,等待通知线程释放锁。
然而,一些实现(尤其是pthreads)认识到这种情况,并通过在notify中将等待线程从condition_variable
队列直接转移到mutex
队列,而不唤醒它,从而避免这种“匆忙等待”的场景。
但是若需要精确的事件调度,那么先加锁后通知是有必要的,例如:等待线程将在满足条件后直接退出程序,这将导致通知线程的condition_variable被销毁,为了不让等待线程立即获得锁,那么在加锁状态下进行通知可能是有必要的。
condition_variable
的wait/wait_for/wait_until函数只能以unique_lock作为参数,但是condition_variable_any
的wait/wait_for/wait_until函数可以以任何Lockable类型的锁作为参数。除此之外,两者的能力完全相同。看下源码就知道:condition_variable_any
内部也是通过condition_variable
和unique_lock<mutex>
实现的。
下面使用condition_variable_any
改造上面condition_variable.wait处的案例。
std::mutex mtx;
std::condition_variable_any cv;
int cargo = 0;
bool shipment_available() { return cargo != 0; }
void consume(int n) {
for (int i = 0; i < n; ++i) {
mtx.lock();
cv.wait(mtx, shipment_available);
// consume:
std::cout << cargo << '\n';
cargo = 0;
mtx.unlock();
}
}
int main() {
std::thread consumer_thread(consume, 10);
// produce 10 items when needed:
for (int i = 0; i < 10; ++i) {
while (shipment_available()) std::this_thread::yield();
mtx.lock();
cargo = i + 1;
cv.notify_one();
mtx.unlock();
}
consumer_thread.join();
return 0;
}
// 输出
1
2
3
4
5
6
7
8
9
10
子线程因为cargo等于0,会进入阻塞状态,然后主线程修改cargo,并且唤醒子线程。
只不过这里wait函数的参数由unique_lock<mutex>
变为了mutex
。
函数原型:
void notify_all_at_thread_exit (condition_variable& cond, unique_lock<mutex> lck);
当调用线程退出时,所有等待condition_variable
的线程都会被唤醒。
使用该函数前,调用线程必须已经锁定了unique_lock的mutex。然后,该函数会先解锁unique_lock的mutex,然后唤醒其他线程,类似于下面的流程:
unique_lock.unlock();
condition_variable.notify_all();
下面👇看一个案例:
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id(int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
// ...
std::cout << "thread " << id << '\n';
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
// unique_lock禁止了拷贝构造函数和拷贝赋值操作符,所以只能使用移动构造函数和移动赋值操作符,这里借助移动语义,使用移动构造函数。
std::notify_all_at_thread_exit(cv, std::move(lck));
ready = true;
}
int main() {
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(print_id, i);
std::cout << "10 threads ready to race...\n";
std::thread(go).detach(); // go!
for (auto &th : threads) th.join();
return 0;
}
10个子线程,阻塞(wait)在同一个mutex
,go线程退出时,唤醒了所有子线程,然后10个子线程竞争锁,获得执行权。
因为unique_lock
禁止了拷贝构造函数和拷贝赋值操作符,所以只能使用移动构造函数和移动赋值操作符,所以上面借助移动语义,使用了unique_lock
的移动构造函数。