《C++ 并发编程实战》1. 基础知识

并发的概念

并发(concurrency)与并行(parallel)的含义在很大程度上重叠,差别甚小。但是并行更强调性能,并发更关心的是分离关注点

多线程并发与多进程并发

为什么使用并发:

  • 为分离关注点(separation of concerns)而并发
  • 为性能而并发:
    • 任务并行:分解任务
    • 数据并行:线程分别对数据的不同部分执行相同操作

线程管控

发起线程

使用 std::thread,可以传入一个可调用对象和它的构造参数

注意避免 Most Vexing Parse,可以使用:

  • 多加一对括号
  • 使用列表初始化方法

在发起线程后,需要决定是等待(join)还是让它独自运行(detach

如果在 std::thread 析构的时候还没决定好,那么会直接调用 std::terminate 终止整个程序

C++ 的标准库是这样写的:

~thread() {
  if (joinable()) std::__terminate();
}

其中,joinable() 表示它能够进行汇合

传入参数

如果传入了构造参数,那么会使用这些参数来进行构造

注意,函数将会在新线程中执行,下面这样可能会引起某些问题:

void f(int i, const std::string& s);
void oops(int some_param) {
    char buffer[1024];
    std::thread t(f, 3, buffer);
    t.detach();
}

上面的代码中,有可能在函数退出后才执行 std::string 的构造函数,我们提前在主线程进行构造:

std::thread t(f, 3, std::string(buffer));

上面新构造的 std::string 是一个临时对象,它是个右值,而 const T& 既可以绑定左值也可以绑定右值

如果换成 void f(int i, std::string& s); 上面的代码就编译不过了,这里和普通的函数调用不太一样:

void f(int i, std::string& s);

std::string s;
f(3, s);                  // 可以编译成功
std::thread t(f, 3, s);   // 编译失败

这是因为在模板元编程中,大多数使用值传递,标准库也是如此(另一个例子是 std::bind),并且使用右值引用,源代码大概是这样:

template<typename _Callable, typename... _Args, typename = _Require<__not_same<_Callable>>>
explicit thread(_Callable&& __f, _Args&&... __args) {
    static_assert( __is_invocable<typename decay<_Callable>::type, typename decay<_Args>::type...>::value,
                  "std::thread arguments must be invocable after conversion to rvalues");
    ...
}

Tips:如果一个复合类型有多种类型的引用,如果有 & 那就是左值引用,否则就是右值引用,这被叫做引用折叠规则,如:

std::string && && & && & & & && rs1;  // 左值引用
std::string && & && && && & & rs2;    // 左值引用
std::string & & && && && && && rs2;   // 左值引用
std::string && && && && && && rs2;    // 右值引用

它认为 f(int&&, std::string&&) 并不可调用,这非常正确,因为非 const 左值引用不能接受右值引用实参

我们可以使用 std::ref 来传递引用,这个东西实际上就是里面存了一个指针,然后定义了一个到左值引用的转换函数,源码如下:

template<typename _Tp>
class reference_wrapper {
    _Tp* _M_data;
    
    ...
    
    _Tp& get() const noexcept { return *_M_data; }
    operator _Tp&() const noexcept { return this->get(); }
    
    ...
};

std::ref 可以隐式转换成所存储的对象的左值引用,如果将它作为参数,就会隐式转换成左值引用:

image-20250327153029701

代码改成下面这样:

void f(int i, std::string& s);

std::string s;
std::thread t(f, 3, std::ref(s));

但是这样的话传递的是 s 的引用,与我们的目标不同

我们可以自己写一个可调用的的类,或者使用 lambda 表达式来完成

使用智能指针

可以使用智能指针来方便地实现线程间的引用,例如 std::unique_ptr

void process_big_object(std::unique_ptr<big_object>);

std::unique_ptr<big_object> p(new big_object);
std::thread t(process_big_object, std::move(p));

std::unique_ptr 实现了移动构造函数,传给它右值引用它能够直接移动

等待线程完成

使用 join() 来阻塞本线程,等待子线程完成,非常简单粗暴

然而,如果想要更细粒度的线程等待,需要条件变量和 std::future

join() 只能调用一次,调用后线程就不再可汇合(joinable),joinable() 会返回 false

注意异常可能会跳过 join() 的调用,需要进行异常处理,如:

std::thread t(func);
try {
    // 可能抛出异常
} catch(...) {
    t.join();
    throw;
}
t.join();

但是上面的处理太复杂了,在现代 C++ 中最好使用 RAII(Resource Acquisition Is Initialization,资源获取即初始化)的方式来处理,如:

class thread_gaurd {
public:
    explicit thread_guard(std::thread& t) noexcept : t_(t) {
    }
    
    ~thread_guard() {
        if (t.joinable()) t.join();
    }
    
    thread_guard(const thread_guard&) = delete; // 其实 std::thread 根本不可拷贝
    thread_guard& operator=(const thread_guard&) = delete;
private:
    std::thread& t_;
};

...
std::thread t(func);
thread_gaurd g(t);
// 做一些事情

在后台运行线程

调用 detach 后,线程会在后台运行

它的归属权和控制权会转移给 C++ 运行时库(runtime library),由此保证资源的正确回收

常常被称为守护进程(daemon process)

移交线程归属权

std::thread 支持移动语义,这样它就可以转移所有权了

我们可以改进一下原来的 thread_gaurd,改成移动构造函数,这就是 std::jthread(C++ 20)做的事情

如果容器支持移动语义,那么它就可以装载 std::thread 对象,如:

void do_work(int);

void f() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 20; ++i) {
        threads.emplace_back(do_work, i);
    }
    for (auto& entry : threads) entry.join();
}

线程数量与线程 ID

std::thread::hardware_concurrency() 函数可以返回程序在运行中可真正并发的线程数量,如果无效返回 0

std::thread 中有个 get_id() 方法可以获取线程的 ID(std::thread::id

或者直接调用 std::this_thread::get_id() 返回当前线程的 ID,如果没有的话返回 0

在线程间共享数据

条件竞争(Race Condition)

不变量(invariant):针对某一特定数据的断言,如“这个变量的值是链表节点的前驱”

数据更新往往会破坏不变量,如在删除节点的过程中,上面举的不变量的例子失效

条件竞争指的是在并发运行时,结果取决于各个线程执行的相对次序

很多情况下条件竞争是良性的(例如插入列表的顺序通常不重要),我们要防止恶性的条件竞争

数据竞争(data race):并发改动单个对象导致的特定的条件竞争

防止恶性条件竞争的方法:

  • 采取保护措施包装数据结构
  • 修改数据结构的设计及其不变量,由一连串不可拆分的改动完成数据变更,每个改动都维持不变量不被破坏。即无锁编程,通常很难正确编写
  • 事务(transaction),这本书不会涵盖这个议题

最基本的方式是互斥(mutual exclusion,mutex)

用互斥保护共享数据

C++ 中 std::mutex 是互斥,使用 lock() 方法加锁,unlock() 解锁

但是现代 C++ 中通常使用 RAII 的方式,如 std::lock_guard<> 来使用互斥,它会自动加锁解锁

线程安全的容器

top()pop() 引起的很隐蔽的条件竞争:

const auto value = s.top();  // 1
s.pop();                     // 2

如果有两个线程,它们按照 1.1、2.1、1.2、2.2 的顺序执行,那么它们取到了同一个 top(),并且 pop() 了两次

要解决这个问题,需要从根本上更改接口设计,其中一种做法是把它们组成一个成员函数,再采取互斥保护。但是如果在拷贝时出现异常,会出现数据丢失的情况。还有一些其他方法:

  • 传入引用:pop(Item&),先进行拷贝然后再 pop()。有一些问题,例如 Item 不一定是可赋值的
  • 提供 noexcept 的拷贝构造函数或移动构造函数。容器的用途很受限
  • 返回指针,指向弹出的元素。可以使用智能指针如 std::shared_ptr,先把 top() 放到智能指针中,然后再 pop()

我们可以结合上面的方法

防范死锁

C++ 提供了 std::lock() 用来同时锁住多个互斥:

class X {
private:
    some_big_object some_detail;
    std::mutex m;
public:
    friend void swap(X& lhs, X& rhs) {
        if (&lhs == &rhs) return;                       // 防止对同一个对象锁两次
        std::lock(lhs.m, rhs.m);                        // 同时锁住两个互斥
        std::lock_guard lock_a(lhs.m, std::adopt_lock); // 接收锁的归属权
        std::lock_guard lock_b(rhs.m, std::adopt_lock);
        swap(lhs.some_detail, rhs.some_detail);
    }
};

其中,std::lock_guard 提供了参数 std::adopt_lock,这表示从一个已经上锁的互斥上接收锁的归属权

C++ 17 还提供了更简化的写法:std::scoped_lock,表示同时锁住多个(使用模板可变参数)互斥并且在析构时全部解锁,如:

friend void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::scoped_lock guard(lhs.m, rhs.m);
    swap(lhs.some_detail, rhs.some_detail);
}

即使没有锁,也可能发生死锁现象,如两个线程互相 join,会出现互相等待的情况

防范死锁的准则:

  • 避免嵌套锁:假如已经拥有锁,就不要试图获取第二个锁
  • 一旦持锁,就必须避免调用由用户提供的程序接口:用户提供的接口可能又上了个锁,导致嵌套锁
  • 按照固定顺序加锁:如果多个锁是固定的,就必须事先规定好锁的顺序
  • 使用层级锁:若已经对低层互斥加锁,那么就不准它再对高层互斥加锁。这是按照固定顺序加锁的一种特殊方式
  • 将准则推广到锁之外:引起死锁的原因不一定是加锁。例如可以给线程分层级

std::unique_lock<>

std::unique_lock<> 并不一定占用与之关联的互斥

它的构造函数接受第二个参数,我们可以传入:

  • std::adopt_lock,表示它管理互斥上的锁,也就是直接上锁
  • std::defer_lock,表示它在构造时不上锁,等待有需要时才会上锁(可以调用 lock() 方法或者交给 std::lock() 上锁)

std::unique_lock<> 虽然更灵活,但会占有更多空间,也比 std::lock_guard<> 略慢

它有一个 owns_lock() 方法返回是否占据互斥

它可以使用移动语义来实现互斥所有权的转移:

std::unique_lock<std::mutex> get_lock() {
    extern std::mutex some_mutex;
    std::unique_lock<std::mutex> lock(some_mutex);
    // ...
    return lock;
}

void process_data() {
    std::unique_lock<std::mutex> lock(get_lock());
    // do something
}

上面的这种方式可以称为通道(gate way)。如果想要访问数据,需要先取得通道类的实例(如上面的 get_lock),然后加锁,在释放时解锁

std::unique_lock 允许提前解锁

在初始化过程中保护共享数据

在单线程代码中,有时需要延迟初始化(lazy initialization),在必要时才进行初始化,如:

std::shared_ptr<some_resource> resource_ptr;
void foo() {
    if (!resource_ptr) {
        resource_ptr.reset(new some_resource);
    }
    resource_ptr->do_something();
}

假设共享数据本身就能安全地被并发访问,若将上面的代码转化成多线程形式,则仅有初始化过程需要保护

我们可以全程加锁,但是效率低,即使已经初始化了,也会加锁解锁并顺序执行

双重检验锁(double-checked locking):

std::mutex resource_mutex;
std::shared_ptr<some_resource> resource_ptr;
void foo() {
    if (!resource_ptr) {
        std::lock_guard lk(resource_mutex);
        if (!resource_ptr) {
            resource_ptr.reset(new some_resource);
        }
    }
    resource_ptr->do_something();
}

可能会对指令重排序:分配内存、构造、赋值指针变成分配内存、赋值指针、构造。此时就会发生问题

C++ 标准库提供了 std::once_flag 类和 std::call_once() 函数,以专门处理上面的情况:

std::once_flag resource_flag;
std::shared_ptr<some_resource> resource_ptr;
void init_resource() {
    resource_ptr.reset(new some_resource);
}
void foo() {
    std::call_once(resource_flag, init_resource);
    resource_ptr->do_something();
}

上面的代码可以保证:init_resource 函数只被一个线程执行一次

std::call_once() 的内部使用了更高效的原子操作,比 std::mutex 更轻量

std::mutex 类似,std::once_flag 不可移动和复制

静态变量与线程安全

静态变量初始化顺序:

  • 全局变量、文件域的 static 变量和类的 static 成员变量在 main 函数执行之前初始化
  • 局部静态变量在第一次被使用时初始化

静态变量线程安全性:

  • 非局部静态变量在 main 函数前初始化,所以它们是线程安全的
  • C++ 11 之前,局部静态变量不是线程安全的
  • C++ 11 标准中,要求局部静态变量初始化具有线程安全性

所以,在 C++ 11 之后,线程安全的单例模式很容易实现:

my_class& my_class::get_instance() {
    static my_class instance;
    return instance;
}

共享互斥

C++ 中提供了两种新的互斥:

  • std::shared_mutex(C++ 17)
  • std::shared_timed_mutex(C++ 14)

它们有两种加解锁方法,如 try_lock()try_lock_shared(),分别表示获取独占锁和共享锁

对于这两种互斥,也可以使用 std::lock_guard<>std::unique_lock<> 等获取独占锁

也可以通过 std::shared_lock<> 获取共享锁

共享互斥可以让多个线程都拥有锁,但是与独占锁不能共存

递归互斥

std::recursive_mutex 允许一个线程对同一实例多次加锁。我们必须先释放所有锁(加了多少次锁就得解多少次锁),才可以让另一个线程锁住该互斥

最好不要使用它

并发操作的同步

等待事件或等待其他条件

我们可以维护一个 flag 和对应的互斥,但是需要一直检查这个 flag,造成浪费。可以使用 std::this_thread::sleep_for() 来让当前线程睡几秒来缓解。这是忙等(busy-wait)

最好的方式是使用 C++ 标准库的一些工具来等待事件发生,最基本的方式是条件变量

C++ 提供了条件变量的两种实现:std::condition_variablestd::condition_variable_any,前者需要配合 std::mutex 使用,后者可以配合任意一种互斥使用,但是有些性能损失

std::mutex mutex;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread() {
    while (more_data_to_prepare()) {
        const data_chunk data = prepare_data();
        {
            std::lock_guard lock(mutex);
            data_queue.push(data);
        }
        
        // 只唤醒等待队列中的一个线程
        // 还有一个 notify_all,它会唤醒唤醒等待队列中所有线程
        // 这里由于只添加了一个元素进去,所以唤醒一个就行了
        data_cond.notify_one();
    }
}
void data_processing_thread() {
    while (true) {
        std::unique_lock lock(mutex);
        
        // 传入一个锁,先锁住它
        // 如果后面的 lambda 表达式返回真,那么结束
        // 否则,先解锁,然后阻塞线程
        // 当被 notify 时,结束阻塞,重新进行上面的操作
        data_cond.wait(lock, [] { return !data_queue.empty(); });
        data_chunk data = data_queue.front();
        data_queue.pop();
        lock.unlock();
        
        // process data
        if (is_last_chunk(data)) break;
    }
}

其中,如果唤醒后判断函数返回 false,称为伪唤醒(spurious wake),最好不要让判断函数带有副作用

使用 future 等待一次性任务完成

C++ 标准库有两种 future

  • std::future<>:独占 future
  • std::shared_future<>:共享 future

同一个事件只能关联唯一一个 std::future,但可以关联多个 std::shared_future

future 本身不提供同步访问,若多个线程需访问同一个 future,需要借助互斥等方式

使用 std::async() 异步执行

std::async() 可以按照异步方式启动任务,使用方式与 std::thread 相似,可以传入参数

它返回一个 std::future 对象

我们还可以给 std::async() 补充一个参数:

  • std::launch::deferred:在当前线程上延后调用任务函数,在 wait()get() 时执行
  • std::launch::async:启动新线程,在上面运行函数

或者两者或一下:defered | async,表示让 C++ 自行选择,这是默认参数

使用 std::packaged_tack<> 来抽象异步任务

std::packaged_tack<> 连结了 future 对象和可调用对象,它的定义大概如下:

template <typename Res, typename... ArgTypes>
class packaged_tack<Res(ArgTypes...)> {
public:
    template<typename _Fn, typename = __not_same<_Fn>> // 表示 _Fn 是否是 packaged_tack 类型
	explicit packaged_task(_Fn&& __fn);
    
    future<Res> get_future();
    void operator()(ArgTypes... args);
};

可以使用它来包装任务,然后传给调度器或者线程池,这就隐藏了细节,使任务抽象化

它是可调用对象,所以可以将其包装在 std::function 里面,然后传给 std::thread 对象

使用 std::promise

std::promise 是用来提供值的,std::future 是用来等待结果的

可以用来进行两个线程之间的通信:

在这里插入图片描述

将异常保存到 future

若经由 std::async 调用的函数抛出异常,那么这个异常会被保存到 future

std::packaged_task 也是这样

std::promise 也具有同样的功能,使用 set_exception() 方法:

extern std::promise<double> some_promise;
try {
    some_promise.set_value(calculate_value());
} catch (...) {
    some_promise.set_exception(std::current_exception());
}

如果 std::future 关联的对象(std::promise 或者 std::packaged_task)被直接销毁了,它没有返回任何值,那么 std::future 会抛出 std::future_error 异常

使用 std::shared_future 让多个线程一起等待

我们应该给每个线程都拷贝一份 std::shared_future,而不是直接使用原来的:

image-20250330145351292

异步任务相关数据(参数、返回值等)与对应的 futurepromise 等相关联,这些数据放在了某个堆数据段,用户无法访问它们。这些数据被称为共享状态(shared state)或者异步状态

futurepromise 都具备 valid() 函数,用来辨别共享状态是否有效

std::shared_future 的实例根据 std::future 实例构造而得,由于 std::future 独占状态,所以需要移动语义,在移动之后,std::future 的共享状态变成了空,如:

std::promise<int> p;
std::future<int> f(p.get_future());         // 一个临时变量被移动给了 f
assert(f.valid());
std::shared_future<int> sf(std::move(f));
assert(!f.valid());                         // f 被移动,不再有效
assert(sf.valid());

也可以使用 std::futureshare() 方法得到 td::shared_future

等待多个 future

在 C++ 17 之前,使用 std::experimental::when_all() 来等待多个 future,从而避免多余的上下文切换。然而,<experimental/future> 这个头文件在 C++ 20 被干掉了

除此之外,还有 std::experimental::when_any()

限时等待

有两种超时(timeout)机制可供选择:

  • 迟延超时(duration-based timeout):线程等待指定时长
  • 绝对超时(absolute timeout):在某个特定时间点来临之前一直等待

一般来说 _for 结尾的是迟延超时,_until 结尾的是绝对超时

接受超时时限的函数:

  • std::this_thread::sleep_{for/until}()
  • std::condition_variable 下的 wait_{for/until}(),还有 std::cv_status::{timeout/no_timeout}
  • std::timed_mutexstd::recursive_mutexstd::shared_timed_mutex 中,加锁可以等待
  • std::unique_lockstd::shared_lock 中加锁可以等待
  • std::future

时钟类

C++ 中,每种时钟都是一个类,包括下面的信息:

  • 当前时刻:now() 静态成员函数
  • 时间值的类型:time_point 成员类型
  • 该时钟的计时单元长度(tick period):period 成员类型,表示为秒的分数形式,使用 std::ratio<a, b> 表示每 a/b 秒计数一次。并不保证与运行期实际观测的值一致
  • 计时速率是否恒定,是否能够将其视为恒稳时钟(steady clock):is_steady 静态成员。std::chrono::system_lock 不是恒稳的,std::chrono::steady_lock

时钟类举例:

  • std::chrono::system_lock:表示系统的“真实时间”
  • std::chrono::high_resolution_clock:高精度时钟类,最短计时单元

时长类

std::chrono::duration<> 的模板参数是:

  • 第一个表示采用何种类型表示计时单元数量(如 intdouble 等)
  • 第二个表示每一个计时单元代表多少秒

例如 double 毫秒计时器:std::chrono::duration<double, std::ratio<1, 1000>>

有一些预设的时长类,如:std::chrono::nanoseconds

头文件 <ratio> 也有一些预设的分数

C++ 14 引入了 std::chrono_literals 预定义了后缀运算符,如:

using namespace std::chrono_literals;
auto one_day = 24h;
auto half_an_hour = 30min;
auto max_time_between_messages = 30ms;

使用 std::chrono::duration_cast<> 实现对时长值的显式转换,隐式转换只能从大到小(不能截断)

时长类支持算术运算

迟延超时举例:

std::future<int> f = std::async(some_task);
if (f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready) {
    do_something(f.get());
}

超时时返回 std::future_status::timeout,准备就绪返回 ready,任务被延后返回 deferred

时间点类

std::chrono::time_point<> 的模板参数是:

  • 第一个表示参考的时钟
  • 第二个表示计时单元,默认直接用第一个参数下的:_Clock::duration

时钟纪元:time_since_epoch() 成员函数

可以加减时长来计算新时间点,也可以两个时间点相减得到时长

线程闩和线程卡

线程闩(latch)是一个同步对象,内含一个计数器,当计数器减到 0 时,进入就绪状态。它用于等待一系列目标事件发生

C++ 20 中是 std::latch(C++ 17 之前是 std::experimental::latch),它是是线程安全的,包含:

  • count_down() 来减计数器
  • wait() 来等待就绪
  • is_ready() 来检查是否就绪
  • count_down_and_wait()

线程闩只能用一次,达到就绪状态后就保持不变

它的构造函数接受一个参数,表示计数器的初始值

线程卡(barrier)是可重复使用的同步构件,针对一组给定的线程,在它们之间进行同步。它的作用是让所有线程同步到一点。

在 C++ 17 之前有两种,std::experimental::{barrier/flex_barrier},在 C++ 20 中,合成了一个,即 std::barrier

线程卡构造函数接受计数器的初始值,它在完成后会自动重置,是可重复使用的

barrier(C++ 17 中的 flex_barrier)还接受第二个可选参数,表示补全函数(completion function),在线程全部运行到线程卡处时,会在其中唯一一个线程中运行该补全函数,它还提供了修改线程数目的方法

C++ 的内存模型和原子操作

标准原子类型

原子操作是不可分割的操作,意味着我们不会观察到操作处于半完成状态

C++ 中,所有的原子类型都有 is_lock_free() 成员函数,返回是否是由原子指令(atomic instruction)直接实现,还是要借助编译器和程序库的内部锁来实现

还有 is_always_lock_free 静态常量表达式成员变量,表示在编译期是否能确定是完全无锁结构(可能原子操作需要某些特定 CPU 指令,而某些设备支持另一些不支持,只有在运行期才能真正确定是否无锁)

std::atomic_flag 不提供 is_lock_free(),因为它肯定是无锁的。只有两个方法:

  • 使用 test_and_set() 查值并设置为真
  • 使用 clear() 清除

除了这个成员函数,一般还会提供:

  • load()store()
  • exchange()compare_exchange_weak()compare_exchange_strong()

这些成员函数可以传入内存次序语义(memory-ordering semantics),即枚举类 std::memory_order,它有 6 个取值,分别是:relaxedacquireconsumeacq_relreleaseseq_cst

对于每个成员函数,可以选用的内存次序不同:

  • store()relaxedseq_cst
  • load()relaxedacquireconsumeseq_cst
  • “读、改、写”操作:六个都可以

所有的原子操作都禁止被拷贝和移动

自 C++ 20 起,新增了 wait()notify_one()notify_all() 几个成员函数,与条件变量相似

std::atomic_flag

它可以用来实现自旋锁互斥(spin-lock mutex):

std::atomic_flag flag;
void lock() {
    while (flag.test_and_set(std::memory_order_acquire));
}
void unlock() {
    flag.clear(std::memory_order_release);
}

但是这是忙等,远非最佳选择

它不支持单纯的无修改查值操作

std::atomic<bool>

与正常不同的一点是赋值操作符,它按值返回,如:

std::atomic_bool b;
decltype(b = false) c;    // bool 类型

bool b2;
decltype(b2 = false) c2;  // 错误,因为引用必须赋初值

std::atomic<> 一般还有 exchange() 成员函数,与 test_and_set() 相似,它会获取原有的值,然后让我们选择新的值作为替换

还有比较交换(Compare And Swap,CAS)的操作,原子变量将传入的期望值与自身的值进行比较,如果相等就存入另一个值,否则使用原子变量自身的值更新期望值所属变量,该操作返回 bool 类型,表示是否完成了保存动作(前提是两个值相等)

有些 CPU 不支持单条指令执行该操作,此时负责的线程使用连续的指令来实现该操作,如果此线程被调度走了,那么操作就失败了,这被称为“佯败”,即尽管返回了 true 但仍然失败

C++ 中有两个成员函数:

  • compare_exchange_weak():有可能发生佯败,需要借助循环:

    bool expected = false;
    extern std::atomic<bool> b;
    while (!b.compare_exchange_weak(expected, true) && !expected);
    
  • compare_exchange_strong():内部有一个循环,不会发生佯败,它确保在与期望值相等时一定能够成功更新

比较交换操作可以接受两个内存次序语义,分别对应成功和失败两种情况

std::atomic<T*>、整数原子类型与浮点原子类型

std::atomic<T*> 是指向类型 T 的指针的原子化形式

它提供了对地址的算术运算,如 +=-=++

它还有交换相加(exchange and add)和交换相减操作:fetch_add()fetch_sub(),和交换操作相似,它返回原来的指针而不是更新后的指针

整数原子类型除了不支持乘除外,基本与普通的整数类型相同

浮点原子类型并不支持算术原子运算

泛化的 std::atomic<> 类模板

如果要使用自定义原子类型的话,它必须具备平实拷贝赋值运算符(trivial copy-assignment operator),它的意思是支持简单的原始内存复制及其等效操作,这要求:

  • 使用编译器默认提供的拷贝赋值运算符
  • 不能含有任何虚函数,也不能从虚基类派生得出
  • 非静态数据成员都必须具备平实拷贝赋值运算符

由于以上的限制,编译器可以使用 mempcy() 等行为来完成拷贝(如果是虚函数,那么就不能使用这个函数,会破坏虚表指针)

此外,比较交换操作采用逐位比较运算,所以并不能使用自定义的比较运算符

上述限制可以追溯到前面章节的内容,由于编译器没有能力为自定义原子类型实现无锁操作,所以它必须在内部使用锁,如果调用用户自定义的拷贝赋值、比较等函数,那么可能导致死锁

于是,编译器通过种种限制,能将自定义的类型视为原始字节,从而使得某些特定的原子类型按无锁方式具现化,增加为自定义类型直接执行原子指令的机会

在一般的平台上,如果自定义类型的大小不超过 int 或者 void*,那么基本上都支持原子指令。在某些平台上,不超过两倍也支持(被称为双字比较交换,Double Word Compare And Swap,DWCAS)。对于更大的结构,还是推荐使用互斥

总结一下各个原子类型上可执行的操作:

image-20250401143145331

智能指针原子类型

std::shared_ptr<>引用计数是线程安全的,但是这个变量并不是线程安全的

C++ 提供了对 std::shared_ptr<> 的特化:std::atomic<std::shared_ptr<>>

在 C++ 17 中有 std::atomic_shared_ptr<> 别名,但是 C++ 20 被干掉了

它满足上面的限制,所以有机会无锁实现

在多线程情景下,最好不要使用普通的 std::shared_ptr,使用原子化的版本可以让代码更清晰,避免数据竞争

原子操作的非成员函数

大部分非成员函数命名为 std::atomic_*std::atomic_*_explicit,后者可以传入指定的内存次序

与成员函数不同的是,为了与 C 兼容,它们传入的是指针而不是引用

C++ 标准库提供了按原子化形式访问 std::shared_ptr<> 的实例,不需要使用原子化的版本,例如:

std::shared_ptr<my_data> p;
void process_global_data() {
    std::shared_ptr<my_data> local = std::atomic_load(&p);
    process_data(local);
}
void update_global_data() {
    std::shared_ptr<my_data> local(new my_data);
    std::atomic_store(&p, local);
}

注意,这里使用局部变量接收 atomic_load 的返回值是没有问题的,因为局部变量(以及返回值)只在当前线程可见,不需要跨线程,我们只需要保证跨线程的变量读取是原子操作即可

同步操作和强制次序

同步关系和先行关系

同步关系只存在于原子类型的操作之间,它的基本思想是:

对变量 x 执行原子写操作 W 和原子读操作 R,且两者都有适当的标记。如果 R 读取的值是 W 或者 W 所属线程之后的某个原子写操作写入的值,就认为它们彼此同步

实际上就是:如果一个线程需要原子读操作,那么前面必须要有一个原子写操作

先行关系是程序中确立操作次序的基本要素

在单一线程中,这个关系非常直观,就是排在前面的语句先于后面的语句执行,这是控制流程的先后关系。但是用逗号分开的单个语句没有先行关系

在多线程中,分为两部分:

  • 线程间先行关系:相对简单,它依赖于同步关系。如果甲操作和乙操作分别由不同线程执行,且它们同步,则甲操作跨线程地先于乙操作发生,而且这是可传递的依赖关系
  • 线程内的控制流程的先后关系

两者结合起来,就是多线程的先行关系

原子操作的内存次序

内存次序共有 6 种,它们代表三种模式:

  • 先后一致次序(最严格,默认):seq_cst
  • 获取——释放次序:consumeacquirereleaseacq_rel
  • 宽松次序:relaxed

对第一种和第二种模式,可能会插入额外的同步指令,也可能不会,如 x86 或 x86-64 架构的 CPU 不需要额外的同步指令就能实现获取——释放次序操作的原子化

先后一致次序

  • 禁止 CPU 对本操作的指令重排序
  • 所有线程中的所有原子操作,必须按照一个全局一致的顺序执行(注意不会影响非原子操作,因为它们不带内存标记),形成一个全局总操作序列

对于下面的代码:

#include <assert.h>
std::atomic<bool> x, y;
std::atomic<int> z;

void write_x() {
    x.store(true, std::memory_order_seq_cst);
}
void write_y() {
    y.store(true, std::memory_order_seq_cst);
}
void read_x_then_y() {
    while (!x.load(std::memory_order_seq_cst));
    if (y.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
void read_y_then_x() {
    while (!y.load(std::memory_order_seq_cst));
    if (x.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
int main() {
    x = false;
    y = false;
    z = 0;
    std::thread a(write_x);
    std::thread b(write_y);
    std::thread c(read_x_then_y);
    std::thread d(read_y_then_x);
    a.join();
    b.join();
    c.join();
    d.join();
    assert(z.load() != 0);
}

那么执行序列应该是这样的:

image-20250401212158829

可以看成:每个线程都执行一个原子操作后,再进行下一步

在弱保序的多处理器(如 ARM)计算机上,先后一致次序会导致严重的性能损失

宽松次序

  • 原子操作不存在同步关系
  • 但是单一线程内同个变量上的操作仍然服从先行关系,但几乎不要求线程间存在任何次序关系

除非万不得已不要使用宽松原子操作

获取——释放次序

产生一定程度的同步效果,而不会形成全局总操作序列

  • acquire 表示在同一个线程中,后面的任何原子操作不能重排到它前面
  • release 表示在同一个线程中,前面的任何原子操作不能重排到它后面
  • acq_rel:上面两个都有

一些问题:

  • acquire 不会等待 release,但它保证如果 release 先执行,那么 acquire 一定能看到 release 之前的写入
  • 如果 acquire 先执行,它不会阻塞,而是继续运行,可能看到旧值

不要使用 consume 次序

获取——释放次序可以限制 relaxed 次序,并且可以用来实现同步

“读——改——写”操作采用 acq_rel 次序

栅栏

栅栏(fence)用于强制施加内存次序,却无需改动任何数据

C++ 中是 std::atomic_thread_fence(),它可以传入一个内存次序语义

例如:

#include <assert.h>
std::atomic<bool> x, y;
std::atomic<int> z;

void write_x_then_y() {
    x.store(true, std::memory_order_relaxed);                // 1
    std::atomic_thread_fence(std::memory_order_release);
    y.store(true, std::memory_order_relaxed);                // 2
}
void read_y_then_x() {
    while (!y.load(std::memory_order_relaxed));              // 3
    std::atomic_thread_fence(std::memory_order_acquire);
    if (x.load(std::memory_order_relaxed)) {                 // 4
        ++z;
    }
}
int main() {
    x = false;
    y = false;
    z = 0;
    std::thread a(write_x_then_y);
    std::thread b(read_y_then_x);
    a.join();
    b.join();
    assert(z.load() != 0);
}

上面的代码中,两个栅栏形成了同步关系,如果不加这个栅栏,可能会出现 2 先于 1 执行或者 4 先于 3 执行的情况

它保证了 b 线程如果看到 y == true 那么一定能看到 x == true

在上面的例子中,其实可以将 x 换成非原子操作,因为两个线程不会就这个变量发生数据竞争