0%

C++:多线程开发

[TOC]

1. C++11 Thead线程库的基本使用

进程与线程

  • 进程:运行中的程序
  • 线程:进程中的进程

本文详细介绍C++11 Thead线程库的基本使用,包括如何创建线程、启动线程、等待线程完成以及如何分离线程等。

1. 创建线程

要创建线程,我们需要一个可调用的函数或函数对象,作为线程的入口点。在C++11中,我们可以使用函数指针、函数对象或lambda表达式来实现。创建线程的基本语法如下:

1
2
#include <thread>
thread t(function_name, args...);

function_name是线程入口点的函数或可调用对象

args...是传递给函数的参数

测试代码:

1
2
3
4
5
6
7
8
9
10
11
#include <iostream>
#include <thread>
using namespace std;
void printHello() {
cout << "hello" << endl;
}
int main() {
//创建线程
thread thread1(printHello);
return 0;
}

报错:

image-20240605121651420

原因:还没等线程执行完就return了


2. 主程序等待线程执行完毕

创建线程后,我们可以使用t.join()等待线程完成,或者使用t.detach()分离线程,让它在后台运行。

1
thread1.join();

加上这一句,程序会检查线程是否执行结束,就不会报错了


3.传递参数

我们可以使用多种方式向线程传递参数,例如使用函数参数、全局变量、引用等。如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
using namespace std;

void printHello(string msg) {
cout << msg << endl;
}

void increment(int& x){
++x;
}
int main() {
//值传递
thread thread1(printHello,"hello");
thread1.join();
int x = 0;
//引用传递
thread t2(increment, ref(x));
t2.join();
cout << x << endl;
return 0;
}

在第一个例子中,我们使用了一个字符串作为函数参数,传递给线程。在第二个例子中,我们使用了一个引用来传递一个整数变量。需要注意的是,当我们使用引用传递参数时,我们需要使用std::ref来包装引用,否则编译器会报错。


4.分离线程

detach():用的不多,用这个即使线程没有执行完,主程序结束也不会报错

有时候我们可能不需要等待线程完成,而是希望它在后台运行。这时候我们可以使用t.detach()方法来分离线程。

需要注意的是,一旦线程被分离,就不能再使用t.join()方法等待它完成。而且,我们需要确保线程不会在主线程结束前退出,否则可能会导致未定义行为。


5. joinable()

joinable()方法返回一个布尔值,如果线程可以被join()或detach(),则返回true,否则返回false。如果我们试图对一个不可加入的线程调用join()detach(),则会抛出一个std::system_error异常。

所以严谨的做法,在使用join前面要使用joinable()进行判断

下面是一个使用joinable()方法的例子:

1
2
3
if (t.joinable()) {
t.join();
}

2. 线程函数中的数据未定义错误

1. 传递引用变量

1
2
3
4
5
6
7
8
void increment(int& x){ 
++x;
}
int main() {
thread thread1(increment,1);
thread1.join();
return 0;
}

报错:image-20240605123357864

因为函数参数是引用类型,所以使用ref来传递引用变量

1
2
int a=1;
thread t1(increment,ref(a));

2. 传递指针或引用指向局部变量的问题

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
using namespace std;
thread thread1;
void increment(int& x){
++x;
}
void test() {
int a = 1;
//a是局部变量
thread1 = thread(increment, ref(a));
}
int main() {
test();
thread1.join();
return 0;
}

a是局部变量,这样会导致在线程函数执行时,指向局部变量a的指针已经被销毁,从而导致未定义行为。


3. 传递指针或引用指向已释放的内存的问题

1
2
3
4
5
6
7
8
9
10
void print(int *x){ 
cout << *x << endl;
}
int main() {
int* ptr = new int(1);
thread thread1 = thread(print, ptr);
delete ptr;
thread1.join();
return 0;
}

线程还没有执行完,就把ptr释放了,会报错

将类对象传入函数,线程还没执行完就释放对象也会有同样的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <thread>
using namespace std;
class A {
public:
void f() {
cout << "hello" << endl;
}
};
int main() {
A a;
thread t(&A::f, &a);//注意这个的写法
t.join();
return 0;
}

解决:通过智能指针

1
2
shared_ptr<A> a = make_shared<A>();
thread t(&A::f, a);

4. 入口函数为类的私有成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
class A {
private:
void f() {
cout << "hello" << endl;
}
};
int main() {
//指向类A的一个指针
shared_ptr<A> a = make_shared<A>();
thread t(&A::f, a);
t.join();
return 0;
}

f()是类A的私有函数,不可访问

解决方法:使用友元

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class A {
private:
friend void thread_f();
void f() {
cout << "hello" << endl;
}
};
//要使用类A的私有函数,需要在类A里生命声明它是友元
void thread_f() {
shared_ptr<A> a = make_shared<A>();
thread t(&A::f, a);
t.join();
}
int main() {
thread_f();
return 0;
}

3. 互斥量解决多数据共享的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int a = 0;
void func() {
for (int i = 0; i < 10000; i++) {
a++;
}
}
int main() {
thread thread1(func);
thread thread2(func);
thread1.join();
thread2.join();
cout << a << endl;
return 0;
}

两个线程同时对a进行操作,最后结果可能不等于20000,而是任何小于等于20000的可能值

解决方法:上锁

1
2
3
4
5
6
7
8
9
#include <mutex>
mutex mtx;
void func() {
for (int i = 0; i < 10000; i++) {
mtx.lock();//当访问a的时候对a进行加锁,其他线程无法访问a
a++;
mtx.unlock();//访问结束进行解锁,其他线程又可以访问a
}
}

线程安全:如果多线程程序每一次运行的结果和单线程程序的结果始终是一样的,那么这个线程就是安全的

4. 互斥量死锁

假设有两个线程 T1 和 T2,它们需要对两个互斥量 mtx1 和 mtx2 进行访问,而且需要按照以下顺序获取互斥量的所有权:

  • T1 先获取 mtx1 的所有权,再获取 mtx2 的所有权。

  • T2 先获取 mtx2 的所有权,再获取 mtx1 的所有权。

如果两个线程同时执行,就会出现死锁问题。因为 T1 获取了 mtx1 的所有权,但是无法获取 mtx2 的所有权,而 T2 获取了 mtx2 的所有权,但是无法获取 mtx1 的所有权,两个线程互相等待对方释放互斥量,导致死锁。

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mtx1;
mutex mtx2;
void func1() {
for (int i = 0; i < 10000; i++) {
mtx1.lock();
mtx2.lock();
mtx1.unlock();
mtx2.unlock();
}
}

void func2() {
for (int i = 0; i < 10000; i++) {
mtx2.lock();
mtx1.lock();
mtx2.unlock();
mtx1.unlock();
}
}
int main() {
thread thread1(func1);
thread thread2(func2);
thread1.join();
thread2.join();
return 0;
}

结果:程序卡死了

解决方法:改变死锁顺序,保证获取死锁的顺序是一样的,比如说都先获取mtx1,再获取mtx2,就不会卡住了

5. lock_guard 与 unique_lock


lock_guard

lock_guard 是 C++ 标准库中的一种互斥量封装类,用于保护共享数据,防止多个线程同时访问同一资源而导致的数据竞争问题。

lock_guard 的特点如下:

  • 当构造函数被调用时,该互斥量会被自动锁定。
  • 当析构函数被调用时,该互斥量会被自动解锁。
  • lock_guard 对象不能复制或移动,因此它只能在局部作用域中使用。
1
2
3
4
5
6
7
mutex mtx;
void func() {
for (int i = 0; i < 10000; i++) {
lock_guard<mutex> lg(mtx);//离开作用域的时候析构,自动解锁
a++;
}
}

lock_guard 的实现

  1. 构造函数内进行加锁,析构函数内进行解锁

  2. 禁用了拷贝构造和赋值构造,所以不能复制和移动

    image-20240605131332588


unique_lock

unique_lock 是 C++ 标准库中提供的一个互斥量封装类,用于在多线程程序中对互斥量进行加锁和解锁操作。它的主要特点是可以对互斥量进行更加灵活的管理,包括延迟加锁、条件变量、超时等。

unique_lock 提供了以下几个成员函数:

  • lock():尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁。
  • try_lock():尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则函数立即返回 false,否则返回 true
  • try_lock_for(const std::chrono::duration<Rep, Period>& rel_time):尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁,或者超过了指定的时间。
  • try_lock_until(const std::chrono::time_point<Clock, Duration>& abs_time):尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁,或者超过了指定的时间点。
  • unlock():对互斥量进行解锁操作。
1
2
3
4
5
unique_lock<mutex> lg(mtx);//自动完成加锁
============================================================
//延迟加锁,传入一个参数,此时解锁还是自动完成
unique_lock<mutex> lg(mtx,defer_lock);//defer_lock
lg.lock();//后面自己手动加速

不自动加锁的原因:自己加锁可以有更多的加锁方案

比如:try_lock_for(const std::chrono::duration<Rep, Period>& rel_time)。其他互斥锁获取不到锁就阻塞在这里,而try_lock_for是只会等待一段时间,还没有获取锁就跳过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int a = 0;
timed_mutex mtx;//mutex不支持对时间的操作,所以这里使用时间锁
void func() {
for (int i = 0; i < 2; i++) {
unique_lock<timed_mutex> lg(mtx,defer_lock);
if (lg.try_lock_for(chrono::seconds(2))) {
this_thread::sleep_for(chrono::seconds(1));
a++;
}
}
}
int main() {
thread thread1(func);
thread thread2(func);
thread1.join();
thread2.join();
cout << a << endl;
return 0;
}

运行结果:3

原因解释(一种可能情况):

thread1先拿到锁(a++),thread2等待1s;thread1释放锁,下一个又是thread1拿到锁(a++),则thread2等待2s就不等了,进入下一次循环拿到锁了,执行a++。所以结果:a=3

6. call_once及其使用场景

单例设计模式是一种常见的设计模式,用于确保某个类只能创建一个实例。由于单例实例是全局唯一的,因此在多线程环境中使用单例模式时,需要考虑线程安全的问题。

下面是一个简单的单例模式日志类的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
class Log {
private:
Log(){}
public:
Log(const Log& log) = delete;
Log& operator=(const Log& log) = delete;
static Log& GetInstance() {
static Log* log = NULL;
if (!log) log = new Log();
return *log;
}
void print(string msg) {
cout << msg << endl;
}
};
int main() {
Log::GetInstance().print("hello");
return 0;
}

在这个单例类中,我们使用了一个静态成员函数 getInstance() 来获取单例实例,该函数使用了一个静态局部变量 log 来存储单例实例。由于静态局部变量只会被初始化一次,因此该实现可以确保单例实例只会被创建一次。

但是,该实现并不是线程安全的。如果多个线程同时调用 getInstance() 函数,可能会导致多个对象被创建,从而违反了单例模式的要求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Log {
private:
Log(){
cout<<"I am here"<<endl;
}
public:
Log(const Log& log) = delete;
Log& operator=(const Log& log) = delete;
static Log& GetInstance() {
static Log* log = NULL;
//例如t1先进来log为空,创建完log之后,还没来得及把值赋给log
//导致log已创建,但是log仍等于null
//此时t2进来了,发现log为空又创建一次log
//不符合单例模式的要求
if (!log) log = new Log();
return *log;
}
void print(string msg) {
cout << msg << endl;
}
};
void printError() {
Log::GetInstance().print("error");
}
int main() {
thread t1(printError);
thread t2(printError);
t1.join();
t2.join();
return 0;
}

运行结果:image-20240605135337140

显然是有错误的

为了解决这些问题,我们可以使用 std::call_once 来实现一次性初始化,从而确保单例实例只会被创建一次。下面是一个使用 std::call_once 的单例实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
static once_flag once;
class Log {
private:
Log(){
cout << "I am here" << endl;
}
static Log* log;
public:
Log(const Log& log) = delete;
Log& operator=(const Log& log) = delete;

static void init() {
if (!log) log = new Log();
}

static Log& GetInstance() {
call_once(once, init);
return *log;
}

void print(string msg) {
cout << msg << endl;
}
};
Log *Log::log = NULL;

void printError() {
Log::GetInstance().print("error");
}
int main() {
thread t1(printError);
thread t2(printError);
t1.join();
t2.join();
return 0;
}

运行结果:image-20240605135435435

使用 call_once 可以确保单例实例只会被创建一次,从而避免了多个对象被创建的问题。

call_once 是 C++11 标准库中的一个函数,用于确保某个函数只会被调用一次。其函数原型如下:

使用 call_once 可以在多线程环境中实现一次性初始化,避免了多个线程同时初始化的问题。例如,在单例模式中,可以使用 call_once 来保证单例实例只会被创建一次。

7. condition_variable 与其使用场景

std::condition_variable 的使用:

  1. 创建一个 condition_variable 对象。

  2. 创建一个互斥锁 mutex 对象,用来保护共享资源的访问。

  3. 在需要等待条件变量的地方

    使用 unique_lock<mutex> 对象锁定互斥锁

    并调用 condition_variable::wait()condition_variable::wait_for()condition_variable::wait_until() 函数等待条件变量。

  4. 在其他线程中需要通知等待的线程时,调用 condition_variable::notify_one()condition_variable::notify_all() 函数通知等待的线程。

生产者消费者模型

  • 任务队列
  • 生产者:负责往任务队列加任务
  • 消费者:负责往任务队列取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
queue<int> que;
condition_variable cv;
//负责往队列加任务
void Producer() {
for (int i = 0; i < 10; i++) {
que.push(i);
}
}
//负责从队列里取任务
void Consumer() {
while (1) {
int value = que.front();
que.pop();
}
}

问题:consumer取任务的同时,producer有可能正在放任务,所以要加速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
queue<int> que;
condition_variable cv;//条件变量
mutex mtx;
//负责往队列加任务
void Producer() {
for (int i = 0; i < 10; i++) {
{
unique_lock<mutex> lock(mtx);
que.push(i);
//通知消费者来取任务
cv.notify_one();//通知一个线程来取任务
cout << "Producer : " << i << endl;
}
this_thread::sleep_for(chrono::microseconds(100));
}
}
//负责从队列里取任务
void Consumer() {
while (1) {
unique_lock<mutex> lock(mtx);
//如果队列为空,就要等待
cv.wait(lock, []() {
return !que.empty();
});//如果第二个参数为false,阻塞在这里
int value = que.front();
que.pop();
cout << "Consumer : " << value << endl;
}
}
int main() {
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
return 0;
}