C++并发编程

C++并发编程是指在C++程序中使用多线程和同步机制来实现并发执行的功能。并发编程可以提高程序的性能和响应速度,同时也可以简化程序的设计和实现。

C++中,可以使用多个线程来并发执行不同的任务。为了避免竞态条件和数据不一致等问题,需要使用同步机制来协调多个线程之间的操作。常用的同步机制包括互斥锁、条件变量、原子操作等。

hello world开始

  • join() 函数:阻塞当前线程,直到关联的线程执行完毕。

  • detach() 函数:让一个线程在后台独立运行,不受主线程的控制,从而允许主线程提前退出而不必等待后台线程完成。

void hello() {
cout << "hello world" << endl;
}

int main() {
thread t(hello);
t.join(); // must add this line otherwise will failed!
// 需要注意的是线程对象执行了join后就不再joinable了,所以只能调用join一次。
return 0;
}

Basic

悬空引用

i 的生命周期结束时,func 对象中的引用 i 就变成了悬空引用,其指向的内存可能被其他程序占用或被释放,从而导致不可预测的错误。

void do_something(int &i) {
cout << "do_something" << endl;
}
struct func {
int &i;

func(int &i_) : i(i_) {}

void operator()() {
for (unsigned j = 0; j < 1000000; ++j) {
do_something(i); // 1. 潜在访问隐患:悬空引用
}
}
};

简洁机制

保线程在函数结束之前安全退出

// 特殊情况下的等待
void f() {
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
try {
// do_something_in_current_thread();
}
catch (...) {
t.join(); // 1
throw;
}
t.join(); // 2
}

// try catch 只能捕获轻量级错误,所以如需确保线程在函数之前结束——查看是否因为线程函数使用了局部变量的引用,
// 以及其他原因——而后再确定一下程序可能会退出的途径,无论正常与否,可以提供一个简洁的机制,来做解决这个问题。

RAII

一种方式是使用“资源获取即初始化方式 (RAII,Resource Acquisition Is Initialization),并且提供一个类,在析构函数中使用join(),

// std::thread支持移动的好处是可以创建thread_guard类的实例,并且拥有其线程的所有权。
class thread_guard {
std::thread &t;
public:
explicit thread_guard(std::thread &t_) :
t(t_) {}

~thread_guard() {
if (t.joinable()) // 1
{
t.join(); // 2
}
}

thread_guard(thread_guard const &) = delete; // 3
thread_guard &operator=(thread_guard const &) = delete;
};
void f1()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
// do_something_in_current_thread();
} // 4
// 线程所关联的局部对象(即 my_func 的参数)的生命周期将在 thread_guard 对象之后结束,而不是在线程结束之后。当线程执行到4处时,局部对象就要被逆序销毁了。因此,thread_guard对象g是第一个被销毁的,
// 这时线程在析构函数中被加入2到原始线程中。
// 即使do_something_in_current_thread抛出一个异常,这个销毁依旧会发生。

由于线程的管理得到了线程保护对象的管理,线程在程序结束时一定会被正确的处理,并且不会产生悬空的线程引用,保证了线程的安全退出.

Most Vexing Parse

Most Vexing Parse:声明的对象被解析为一个函数声明而不是对象定义。

class background_task {
public:
void operator()() const {
cout << "ok" << endl;
}
};
// error
thread my_thread1(background_task())

解决如下:

  1. 使用多组括号:这种方法可以告诉编译器我们正在声明一个对象,而不是一个函数。

  2. 使用新的初始化语法:C11引入了C的新初始化语法,即使用花括号{}来初始化对象。

thread my_thread1((background_task())); // 使用多组括号
my_thread1.join();

thread my_thread2{background_task()}; // 使用新的初始化语法
my_thread2.join();

transfer

这段代码演示了如何使用std::thread来创建新线程,并将类成员函数作为线程函数传递,同时也演示了如何使用std::unique_ptr来管理资源,并将这个资源传递给一个新线程。

  • std::move(p): 左值转化为右值

class X {
public:
void do_length_work() {};
};

void process_big_object(std::unique_ptr<X>);

int main() {
X my_x;
thread t(&X::do_length_work, &my_x); // 1


std::unique_ptr<X> p(new X);
p->do_length_work();
std::thread tt(process_big_object,std::move(p));
//std::thread实例的可移动且不可复制性。不可复制保性证了在同一时间点,
// 一个std::thread实例只能关联一个执行线程;可移动性使得程序员可以自己决定,哪个实例拥有实际执行线程的所有权。
return 0;
}

ownership

线程的创建、移动和管理

void some_function() {}
void some_other_function() {}

std::thread t1(some_function); // 构造一个thread对象t1
std::thread t2 = std::move(t1); // 把t1 move给另外一个thread对象t2,t1不再管理之前的线程了。
// 这句不需要std::move(),从临时变量进行移动是自动和隐式的。调用的是operator=(std::thread&&)
t1 = std::thread(some_other_function);
std::thread t3;
t3 = std::move(t2); // 把t2 move给t3
// 把t3 move给t1,非法。因为`t1`已经有了一个相关的线程,会调用`std::terminate()`来终止程序。
t1 = std::move(t3);

一组线程

这段代码使用了std::vectorstd::thread,它展示了如何使用C++11标准库来创建一组线程并等待它们完成工作。

void do_work(unsigned id) {}

void f() {
std::vector<std::thread> threads;
for (unsigned i = 0; i < 20; ++i) {
threads.push_back(std::thread(do_work, i)); // 产生线程
}
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join)); // 对每个线程调用join()
// &std::thread::join获取的是std::thread::join成员函数的地址,将地址传递给std::mem_fn进行包装,从而将这个函数变成一个可调用对象。
}

RAII类scoped_thread

这是一个RAII类scoped_thread的定义,它使用C++11标准库中的std::thread类来封装线程对象。具体来说,scoped_thread类通过重载构造函数和析构函数来控制线程的生命周期,从而确保线程在适当的时候被销毁。

在这个类的定义中,几个关键的实现细节:

  1. 使用explicit防止隐式类型转换。

  2. 使用std::logic_error来抛出异常,以表示线程没有被启动的错误情况。

  3. 在析构函数中调用join()函数,以确保线程在scoped_thread对象被销毁时能够正常结束。

除此之外,由于线程对象的复制和复制赋值操作不可重载,因此需要通过delete关键字来禁止这些操作。

class scoped_thread {
std::thread t;
public:
explicit scoped_thread(std::thread t_) : // 1
t(std::move(t_)) {
if (!t.joinable()) // 2
throw std::logic_error("No thread");
}

~scoped_thread() {
t.join(); // 3
}

scoped_thread(scoped_thread const &) = delete;

scoped_thread &operator=(scoped_thread const &) = delete;
};

runtime

简单的线程池:将迭代器范围分割成多个子范围,每个子范围由一个线程进行处理,最后合并所有子范围的累加结果,从而实现了并行的累加操作。

在实现上,函数首先根据范围长度和硬件线程数,计算出要使用的线程数和任务块的大小。然后,函数将序列分块,并为每个块创建一个线程来执行累加操作。最后,使用std::accumulate将所有结果相加以得到最终结果。

#include <iostream>
#include <algorithm>
#include <thread>
#include <vector>
#include <numeric>
#include <functional>

using namespace std;

//使得每个线程具有最小数目的元素以避免过多的线程开销
template<typename Iterator, typename T>
struct accumulate_block {
void operator()(Iterator first, Iterator last, T &result) {
result = std::accumulate(first, last, result);
}
};

template<typename Iterator, typename T>
T parallel_accumlate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);

if (!length)
return init;

unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
cout<<max_threads<<endl;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
cout<<hardware_threads<<endl;
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
cout<<num_threads<<endl;
unsigned long const block_size = length / num_threads;
cout<<block_size<<endl;

std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);

Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(accumulate_block<Iterator, T>(), block_start, block_end, std::ref(results[i]));
// std::ref用于将一个对象转化为一个引用
block_start = block_end;
}
accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);
// block_start到last无法划分为一个完整的块时,需要单独对这个子范围执行累加操作
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
// std::mem_fn将一个成员函数指针转换为一个函数对象

return std::accumulate(results.begin(), results.end(), init);
}

int main() {

vector<int> v{3,4,5,6};
int res=0;
cout<<parallel_accumlate(v.begin(),v.end(),res);
return 0;
}

id

代码定义了两个函数do_master_thread_workdo_common_work,分别用于处理主线程和其他线程(即非主线程)的工作。在some_core_part_of_algorithm函数中,代码首先判断当前线程是否为主线程,如果是,则调用do_master_thread_work函数,否则调用do_common_work函数。

#include <iostream>
#include <thread>

using namespace std;


// 线程的通用标识符
std::thread::id master_thread;

void do_master_thread_work() {
cout << "master" << endl;
}

void do_common_work() {
cout << "common" << endl;
}

void some_core_part_of_algorithm() {
if (std::this_thread::get_id() == master_thread) {
do_master_thread_work();
}
do_common_work();
}

int main() {
master_thread = std::this_thread::get_id();
std::cout << "master_thread: " << master_thread << endl;
cout << "master_thread 中运行:" << endl;
some_core_part_of_algorithm();
cout << "thread 中运行:" << endl;
thread t(some_core_part_of_algorithm);
t.join();
return 0;
}

Reference

[1] C++ 那些事: https://github.com/Light-City/CPlusPlusThings

------------------------------- 本文结束啦❤感谢您阅读-------------------------------
赞赏一杯咖啡