[关闭]
@ltlovezh 2020-02-13T10:10:22.000000Z 字数 14424 阅读 975

C++11 thread

C++


C++ 11增加了标准线程库:std::thread,在语言级别上提供了线程支持,并且是跨平台的。在不同操作系统上,依赖于平台本身的线程库,例如Linux上,底层实现是pthread库。
std::thread禁止了拷贝构造函数和拷贝赋值运算符,所以std::thread对象不能拷贝,但是可以移动。

基础知识

一个最简单的实例:

  1. void func(int arg1, int arg2) {
  2. cout << "arg1: " << arg1 << ", arg2: " << arg2 << endl;
  3. cout << "child thread id: " << std::this_thread::get_id() << endl;
  4. }
  5. int main() {
  6. // 创建并启动子线程
  7. std::thread thread(func, 10, 100);
  8. cout << "child thread id: " << thread.get_id() << endl;
  9. thread.join();
  10. cout << "main thread id: " << std::this_thread::get_id() << endl;
  11. cout << "main thread exit" << endl;
  12. return 0;
  13. }
  14. // 输出
  15. child thread id: 0x70000e841000
  16. arg1: 10, arg2: 100
  17. child thread id: 0x70000e841000
  18. main thread id: 0x10ef98dc0
  19. main thread exit

std::thread线程对象支持的操作如下所示:

std::this_thread表示当前线程,this_thread实际是一个namespace,支持如下操作:

下面通过sleep_forsleep_until分别实现休眠10秒:

  1. // 当前时间戳
  2. std::time_t timestamp = std::chrono::system_clock::to_time_t(system_clock::now());
  3. // sleep_until实现休眠10秒
  4. std::this_thread::sleep_until(system_clock::from_time_t(timestamp + 10));
  5. // sleep_for实现休眠10秒
  6. std::this_thread::sleep_for(std::chrono::seconds(10));

关于时间的操作可参考time.h文件

  1. // 当前时间戳(秒)
  2. std::time_t now_timestamp = system_clock::to_time_t(system_clock::now());
  3. // struct tm结构体,包含了时、分、秒
  4. struct std::tm *now_tm = std::localtime(&now_timestamp);
  5. std::cout << "Current time: " << now_tm->tm_hour << ":" << now_tm->tm_min << ":"<< now_tm->tm_sec << '\n';
  6. // 未来一分钟
  7. ++now_tm->tm_min;
  8. // 未来时间戳
  9. std::time_t future_timestamp = std::mktime(now_tm);
  10. std::cout << "Future time: " << now_tm->tm_hour << ":" << now_tm->tm_min << ":"<< now_tm->tm_sec << '\n';
  11. // time_point格式,可用于std::this_thread::sleep_until
  12. std::chrono::time_point<system_clock> x = system_clock::from_time_t(future_timestamp);
  13. // 输出
  14. Current time: 19:3:22
  15. Future time: 19:4:22

线程同步

互斥锁

mutex(锁)

std::mutex表示互斥锁,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符)。不支持递归锁定,若有此需求,可使用std::recursive_mutex代替。

支持的操作:

lock和try_lock的差异主要是不能锁定mutex时表现不同,lock函数会一直阻塞调用线程,直到可以锁定mutex为止;而try_lock则不会阻塞调用线程,而是直接返回,并且返回值为false。

recursive_mutex(可重入锁)

recursive_mutexmutex的基础上,允许同一个线程对同一个recursive_mutex多次加锁,表示获得recursive_mutex的多层所有权,同时对recursive_mutex解锁时,也要调用相同次数的unlock,这样调用线程才能彻底释放对recursive_mutex的所有权,其他线程才能锁定recursive_mutex

timed_mutex(时间锁)

timed_mutexmutex基础上,增加了两个成员函数try_lock_fortry_lock_until,表示等待一段时间,尝试获得锁。

  1. std::timed_mutex timed_mutex;
  2. // 尝试2秒内获得锁
  3. if (timed_mutex.try_lock_for(std::chrono::seconds(2))) {
  4. // do some thing
  5. // 释放锁
  6. timed_mutex.unlock();
  7. }
  8. // 当前绝对时间戳
  9. std::time_t timestamp = std::chrono::system_clock::to_time_t(system_clock::now());
  10. // 尝试阻塞到未来的绝对时间获得锁
  11. if (timed_mutex.try_lock_until(system_clock::from_time_t(timestamp + 10))) {
  12. // do some thing
  13. // 释放锁
  14. timed_mutex.unlock();
  15. }

recursive_timed_mutex(可重入时间锁)

可重入的时间锁,同时具备timed_mutexrecursive_mutex的能力,不再赘述。

Lock模板类

lock_guard

lock_guard是一个管理mutex的对象,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符),除了缺省合成的函数外,没有其他成员函数。构造lock_guard时,mutex被调用线程锁定,销毁lock_guard时,mutex被调用线程解锁。通过这种方式,可以确保程序抛出异常时,也能正确地解锁mutex。这里的mutex可以是四种锁中的任意一种。
lock_guard不会介入mutex生命周期,程序必须保证mutex的生命周期至少延长到持有它的lock_guard销毁为止。

简单来说,构造lock_guard时,获得锁;析构lock_guard时,释放锁。

除了单参数构造函数,lock_guard还有一个包含两个参数的构造函数:
lock_guard(mutex_type& __m, adopt_lock_t),表示创建lock_guard时,构造函数不会对mutex加锁,而是由外部程序保证mutex已经被加锁了。

下面是一个典型案例:

  1. // 互斥锁
  2. std::mutex mtx;
  3. void print_even(int x) {
  4. if (x % 2 == 0) std::cout << x << " is even\n";
  5. else throw (std::logic_error("not even"));
  6. }
  7. void print_thread_id(int id) {
  8. try {
  9. // 使用lock_guard锁定mtx,保证即使是异常逻辑,lock_guard也可以在析构时解锁mtx
  10. std::lock_guard<std::mutex> lck(mtx);
  11. print_even(id);
  12. }
  13. catch (std::logic_error &) {
  14. std::cout << "[exception caught]\n";
  15. }
  16. }
  17. int main() {
  18. std::thread threads[10];
  19. for (int i = 0; i < 10; ++i)
  20. threads[i] = std::thread(print_thread_id, i + 1);
  21. for (auto &th : threads) th.join();
  22. return 0;
  23. }
  24. // 可能的输出
  25. [exception caught]
  26. 6 is even
  27. 4 is even
  28. [exception caught]
  29. [exception caught]
  30. 2 is even
  31. [exception caught]
  32. 8 is even
  33. [exception caught]
  34. 10 is even

unique_lock

unique_lock是一个管理mutex的对象,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符),在lock_guard基础上,增加了locktry_locktry_lock_fortry_lock_untilunlock等成员函数(这些函数的作用在上面👆已经介绍过了),更加灵活,但相应的性能会受一些影响。
下面是源码中的类定义:

  1. template <class _Mutex>
  2. class unique_lock
  3. {
  4. public:
  5. // 模板参数,各类锁
  6. typedef _Mutex mutex_type;
  7. private:
  8. // 锁
  9. mutex_type* __m_;
  10. // 是否已经获得锁
  11. bool __owns_;
  12. public:
  13. // 无参构造函数
  14. unique_lock() : __m_(nullptr), __owns_(false) {}
  15. // 单参数构造函数,与lock_guard一样,构造函数中加锁
  16. explicit unique_lock(mutex_type& __m)
  17. : __m_(_VSTD::addressof(__m)), __owns_(true) {__m_->lock();}
  18. // 带defer_lock_t的构造函数(第二个参数可使用编译时常量std::defer_lock),构造函数中不加锁
  19. unique_lock(mutex_type& __m, defer_lock_t) _NOEXCEPT
  20. : __m_(_VSTD::addressof(__m)), __owns_(false) {}
  21. // 带try_to_lock_t的构造函数(第二个参数可使用编译时常量std::try_to_lock),构造函数中尝试加锁
  22. unique_lock(mutex_type& __m, try_to_lock_t)
  23. : __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock()) {}
  24. // 带adopt_lock_t的构造函数(第二个参数可使用编译时常量std::adopt_lock),与lock_guard一样,构造函数中不加锁,而是默认外部程序已经加锁了
  25. unique_lock(mutex_type& __m, adopt_lock_t)
  26. : __m_(_VSTD::addressof(__m)), __owns_(true) {}
  27. template <class _Clock, class _Duration>
  28. // 构造函数中通过try_lock_until尝试加锁
  29. unique_lock(mutex_type& __m, const chrono::time_point<_Clock, _Duration>& __t)
  30. : __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock_until(__t)) {}
  31. template <class _Rep, class _Period>
  32. // 构造函数中通过try_lock_for尝试加锁
  33. unique_lock(mutex_type& __m, const chrono::duration<_Rep, _Period>& __d)
  34. : __m_(_VSTD::addressof(__m)), __owns_(__m.try_lock_for(__d)) {}
  35. // 析构时,若加锁了,则释放锁
  36. ~unique_lock()
  37. {
  38. if (__owns_)
  39. __m_->unlock();
  40. }
  41. private:
  42. // 相当于删除了拷贝构造函数和拷贝赋值操作符
  43. unique_lock(unique_lock const&); // = delete;
  44. unique_lock& operator=(unique_lock const&); // = delete;
  45. public:
  46. #ifndef _LIBCPP_CXX03_LANG
  47. // 移动构造函数
  48. unique_lock(unique_lock&& __u) _NOEXCEPT
  49. : __m_(__u.__m_), __owns_(__u.__owns_)
  50. {__u.__m_ = nullptr; __u.__owns_ = false;}
  51. // 移动赋值操作符
  52. unique_lock& operator=(unique_lock&& __u) _NOEXCEPT
  53. {
  54. if (__owns_)
  55. __m_->unlock();
  56. __m_ = __u.__m_;
  57. __owns_ = __u.__owns_;
  58. __u.__m_ = nullptr;
  59. __u.__owns_ = false;
  60. return *this;
  61. }
  62. #endif // _LIBCPP_CXX03_LANG
  63. // 主动加锁
  64. void lock();
  65. // 尝试加锁
  66. bool try_lock();
  67. template <class _Rep, class _Period>
  68. bool try_lock_for(const chrono::duration<_Rep, _Period>& __d);
  69. template <class _Clock, class _Duration>
  70. bool try_lock_until(const chrono::time_point<_Clock, _Duration>& __t);
  71. // 释放锁
  72. void unlock();
  73. // swap实现
  74. void swap(unique_lock& __u) _NOEXCEPT
  75. {
  76. _VSTD::swap(__m_, __u.__m_);
  77. _VSTD::swap(__owns_, __u.__owns_);
  78. }
  79. // 返回持有的mutex,但是该函数不会解锁mutex
  80. mutex_type* release() _NOEXCEPT
  81. {
  82. mutex_type* __m = __m_;
  83. __m_ = nullptr;
  84. __owns_ = false;
  85. return __m;
  86. }
  87. // 判断是否已经加锁了
  88. bool owns_lock() const _NOEXCEPT {return __owns_;}
  89. // 重载了函数调用符, 判断是否已经加锁了
  90. operator bool () const _NOEXCEPT {return __owns_;}
  91. // 获得持有的mutex
  92. mutex_type* mutex() const _NOEXCEPT {return __m_;}
  93. };

unique_lock的构造函数,实现了不同的加锁策略,具体可见上面的源码和注释。

除非必要,优先使用更高效的lock_guard。

全局函数

std::call_once

call_once是全局函数模板,原型如下所示:

  1. template <class Fn, class... Args>
  2. void call_once (once_flag& flag, Fn&& fn, Args&&... args);

call_once使用参数args调用fn函数,除非另一个线程已经(或者正在)使用相同的once_flag调用call_once

第一个使用相同once_flag调用call_once的线程,会执行fn函数,同时使其他使用相同once_flag调用call_once的线程进入被动执行状态,即:其他线程不执行fn函数,但是会阻塞到第一个执行fn函数的线程结束。

如果通过call_once执行fn函数的线程抛出了异常,并且存在被动执行的线程,则会从其中选择一个线程使其执行fn函数。

如果已经有线程执行完了call_once,即fn函数返回了,那么当前所有被动执行的线程和将来对call_once的调用(使用相同once_flag)也会立即返回,不会再次执行fn函数。
即多线程环境下,相同once_flagcall_once调用,只会执行一次fn函数

下面看一个实际案例:

  1. int winner;
  2. void set_winner(int x) {
  3. winner = x;
  4. std::cout << "set_winner, x = " << x << std::endl;
  5. }
  6. // 多个线程共用一个once_flag
  7. std::once_flag winner_flag;
  8. // 线程函数
  9. void wait_1000ms(int id) {
  10. // 循环1000次,每次休眠1ms
  11. for (int i = 0; i < 1000; ++i)
  12. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  13. // 不同线程通过同一个once_flag,调用call_once,最终只会有一个线程执行对应的set_winner函数
  14. std::call_once(winner_flag, set_winner, id);
  15. }
  16. int main() {
  17. std::thread threads[10];
  18. // 10个线程
  19. for (int i = 0; i < 10; ++i)
  20. threads[i] = std::thread(wait_1000ms, i + 1);
  21. std::cout << "waiting for the first among 10 threads to count 1000 ms...\n";
  22. for (auto &th : threads) th.join();
  23. std::cout << "winner thread: " << winner << '\n';
  24. return 0;
  25. }
  26. // 可能的输出
  27. waiting for the first among 10 threads to count 1000 ms...
  28. set_winner, x = 8
  29. winner thread: 8

可见,虽然运行了10个线程,但是最终只有一个线程执行了set_winner函数。

std::lock

lock是全局函数模板,原型如下所示:

  1. template <class Mutex1, class Mutex2, class... Mutexes>
  2. void lock (Mutex1& a, Mutex2& b, Mutexes&... cde);

函数会锁定所有的mutex,必要时阻塞调用线程。

lock函数以不确定的顺序调用所有mutex的成员函数:lock、try_lock和unlock,以确保函数返回时,所有mutex都被锁定了(而不会产生任何死锁)。

如果lock函数不能锁定所有mutex(例如:其中一个调用抛出了异常),那么在函数失败之前,首先会解锁它成功锁定的所有mutex

下面👇是同时锁定foo和bar的案例:

  1. std::mutex foo, bar;
  2. void task_a() {
  3. // 若使用foo.lock(); bar.lock(); 而不是std::lock,那么有可能task_a锁定了foo,而task_b锁定了bar,从而导致死锁
  4. // foo.lock(); bar.lock();
  5. std::lock(foo, bar);
  6. std::cout << "task a\n";
  7. foo.unlock();
  8. bar.unlock();
  9. }
  10. void task_b() {
  11. // 若使用bar.lock(); foo.lock(); 而不是std::lock,那么有可能task_a锁定了foo,而task_b锁定了bar,从而导致死锁
  12. // bar.lock(); foo.lock();
  13. std::lock(bar, foo);
  14. std::cout << "task b\n";
  15. bar.unlock();
  16. foo.unlock();
  17. }
  18. int main() {
  19. std::thread th1(task_a);
  20. std::thread th2(task_b);
  21. th1.join();
  22. th2.join();
  23. return 0;
  24. }

std::try_lock

try_lock是全局函数模板,原型如下所示:

  1. // int返回值:若成功锁定了所有mutex,则返回-1;否则返回加锁失败的mutex的索引值
  2. template <class Mutex1, class Mutex2, class... Mutexes>
  3. int try_lock (Mutex1& a, Mutex2& b, Mutexes&... cde);

函数尝试通过mutex.try_lock锁定所有的mutex

try_lock函数通过mutex.try_lock成员函数依次为参数中的mutex加锁(首先是a,然后是b,最后是cde),直到所有调用都成功,或者任意一个调用失败(即mutex.try_lock返回false或抛出异常)。

如果try_lock函数由于某个mutex加锁失败而返回,则会解锁所有先前加锁成功的mutex,并且返回那个加锁失败的mutex的索引值。

  1. std::mutex foo, bar;
  2. void task_a() {
  3. foo.lock();
  4. std::cout << "task a\n";
  5. bar.lock();
  6. // ...
  7. foo.unlock();
  8. bar.unlock();
  9. }
  10. void task_b() {
  11. int x = try_lock(bar, foo);
  12. if (x == -1) {
  13. std::cout << "task b\n";
  14. // ...
  15. bar.unlock();
  16. foo.unlock();
  17. } else {
  18. std::cout << "[task b failed: mutex " << (x ? "foo" : "bar") << " locked]\n";
  19. }
  20. }
  21. int main() {
  22. std::thread th1(task_a);
  23. std::thread th2(task_b);
  24. th1.join();
  25. th2.join();
  26. return 0;
  27. }

condition_varible

condition_variable

condition_variable是一个同步原语,能够阻塞调用线程,直到其他线程通知恢复为止,不可拷贝(删除了拷贝构造函数和拷贝赋值操作符)。
当调用condition_variable任意wait函数时,将使用unique_lock(通过mutex)锁定线程。该线程将一直处于阻塞状态,直到另一个线程调用同一个condition_variable任意notify函数唤醒为止。

condition_variable对象总是使用std::unique_lock<mutex>实现线程阻塞。

condition_variable可用的wait函数:

wait

wait有两个重载版本,如下所示:

  1. void wait (unique_lock<mutex>& lck);
  2. template <class Predicate>
  3. void wait (unique_lock<mutex>& lck, Predicate pred);

表示阻塞调用线程(调用线程调用wait之前必须已经锁定了unique_lock持有的mutex),直到被其他线程唤醒。
该函数阻塞调用线程的时刻,会自动调用unique_lock.unlock解锁mutex,以允许其他线程加锁同一个mutex继续运行。
一旦被其他线程唤醒,该函数会解除阻塞状态,并且调用unique_lock.lock重新加锁(可能会再次阻塞调用线程),让unique_lock恢复到wait函数被调用时的状态。

通常情况下,其他线程调用condition_variable的notify_one或者notify_all成员函数,来唤醒被阻塞的线程。但是,某些实现可能会在不调用任何notify函数的情况下产生虚假唤醒。因此,使用该函数的程序应该确保其恢复条件得到了满足,所以一般在循环结构中调用wait函数,如下所示,这样即使被虚假唤醒,也会因为条件不满足,再次进入阻塞状态。

  1. while(条件不满足){
  2. condition_variable.wait
  3. }

包含_Predicate __pred参数的重载版本中,_Predicate是函数模板的参数,表示返回布尔值的函数。如果__pred返回false,则函数会一直阻塞,只有当它返回true时,notify才能唤醒线程。__pred会一直被调用,直到它返回true,非常适合处理虚假唤醒问题,如下所示:

  1. template <class _Predicate>
  2. void
  3. condition_variable::wait(unique_lock<mutex>& __lk, _Predicate __pred)
  4. {
  5. // __pred()返回false,则一直调用wait阻塞
  6. while (!__pred())
  7. wait(__lk);
  8. }

整体来看,wait类函数有三个注意点:

  1. 调用wait类函数时,有一个前提,就是调用线程必须已经锁定了mutex,
  2. wait类函数被调用时,会做两件事:
    • 阻塞调用线程。
    • 调用unique_lock.unlock,解锁调用线程已经锁定的mutex。
  3. wait类函数被唤醒时,也会做两件事:
    • 解除调用线程的阻塞状态。
    • 调用unique_lock.lock,重新为调用线程加锁。

这样,wait类函数在被调用前和被唤醒返回后,可以确保是同一个线程状态。

下面👇看一个案例:

  1. std::mutex mtx;
  2. std::condition_variable cv;
  3. int cargo = 0;
  4. bool shipment_available() { return cargo != 0; }
  5. void consume(int n) {
  6. for (int i = 0; i < n; ++i) {
  7. std::unique_lock<std::mutex> lck(mtx);
  8. cv.wait(lck, shipment_available);
  9. // consume:
  10. std::cout << cargo << '\n';
  11. cargo = 0;
  12. }
  13. }
  14. int main() {
  15. std::thread consumer_thread(consume, 10);
  16. // produce 10 items when needed:
  17. for (int i = 0; i < 10; ++i) {
  18. while (shipment_available()) std::this_thread::yield();
  19. std::unique_lock<std::mutex> lck(mtx);
  20. cargo = i + 1;
  21. cv.notify_one();
  22. }
  23. consumer_thread.join();
  24. return 0;
  25. }
  26. // 输出
  27. 1
  28. 2
  29. 3
  30. 4
  31. 5
  32. 6
  33. 7
  34. 8
  35. 9
  36. 10

子线程因为cargo等于0,会进入阻塞状态,然后主线程修改cargo,并且唤醒子线程。

wait_for

wait_for有两个重载版本,如下所示:

  1. enum class cv_status{
  2. no_timeout,
  3. timeout
  4. }
  5. // 如果因为时间到了停止阻塞,则返回timeout,否则返回no_timeout
  6. cv_status wait_for(unique_lock<mutex>& __lk, const chrono::duration<_Rep, _Period>& __d);
  7. // 返回pred(),而不管是否触发了超时(尽管只有触发超时时,才能为false)。
  8. bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time, Predicate pred);

wait_for函数在wait基础上,增加了阻塞持续时间的能力,所以有两种方式被唤醒:

👇下面看一个实际案例:

  1. std::condition_variable cv;
  2. int value;
  3. void read_value() {
  4. std::cin >> value;
  5. cv.notify_one();
  6. }
  7. int main() {
  8. std::cout << "Please, enter an integer (I'll be printing dots): \n";
  9. std::thread th(read_value);
  10. std::mutex mtx;
  11. std::unique_lock<std::mutex> lck(mtx);
  12. while (cv.wait_for(lck, std::chrono::seconds(1)) == std::cv_status::timeout){
  13. std::cout << '.' << std::endl;
  14. }
  15. std::cout << "You entered: " << value << '\n';
  16. th.join();
  17. return 0;
  18. }

主线程每次阻塞1秒,若wait_for是以超时结束,则打印.,并且再次进入阻塞状态,直到被子线程主动唤醒退出while循环,结束主线程。

wait_until

wait_until有两个重载版本,如下所示:

  1. enum class cv_status{
  2. no_timeout,
  3. timeout
  4. }
  5. // 如果因为时间到了停止阻塞,则返回timeout,否则返回no_timeout
  6. cv_status wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time);
  7. // 返回pred(),而不管是否触发了超时(尽管只有触发超时时,才能为false)。
  8. bool wait_until (unique_lock<mutex>& lck, const chrono::time_point<Clock,Duration>& abs_time, Predicate pred);

wait_until函数在wait基础上,增加了阻塞到绝对时间的能力,所以有两种方式被唤醒:

notify_all

解除当前正在等待指定condition_variable的所有线程的阻塞状态。如果没有线程在等待,函数将什么也不做。

唤醒所有等待的线程

notify_one

解除当前正在等待指定condition_variable的所有线程中任意一个线程的阻塞状态。如果没有线程在等待,函数将什么也不做。

随机唤醒一个等待的线程

notify_all和notify_one必须在加锁情况下调用吗

并不强制,可以根据具体情况,决定是否需要加锁

通知线程调用notify_all或者notify_one时,不需要提前加锁(与等待线程锁定的同一个mutex)。实际上,先加锁后通知是一种悲观做法,因为被通知的等待线程会立即再次阻塞,等待通知线程释放锁。
然而,一些实现(尤其是pthreads)认识到这种情况,并通过在notify中将等待线程从condition_variable队列直接转移到mutex队列,而不唤醒它,从而避免这种“匆忙等待”的场景。

但是若需要精确的事件调度,那么先加锁后通知是有必要的,例如:等待线程将在满足条件后直接退出程序,这将导致通知线程的condition_variable被销毁,为了不让等待线程立即获得锁,那么在加锁状态下进行通知可能是有必要的。

condition_variable_any

atomic

参考文章

  1. c++ multithreading
  2. C++ std::thread
  3. c++11多线程(2)Mutex总结
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注