《C++ 并发编程实战》基于锁的四个线程安全数据结构
线程安全的栈
#ifndef THREADSAFE_STACK_HPP
#define THREADSAFE_STACK_HPP
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
#include <vector>
struct empty_stack final : std::exception {
[[nodiscard]] const char* what() const noexcept override {
return "empty stack";
}
};
template <typename T>
class threadsafe_stack {
public:
threadsafe_stack() = default;
threadsafe_stack(const threadsafe_stack& other) {
std::lock_guard lock(other.m_);
data_ = other.data_;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
// 这里使用值传递,调用者可以使用 std::move 来避免拷贝构造函数
void push(T new_value) {
// 加锁可能抛出异常,但是即使抛出异常,也不会有问题,因为并没有修改数据
std::lock_guard lock(m_);
// push 操作也有可能抛出异常,例如在拷贝、移动过程中,或者在扩展容器的过程中
// 但是标准库容器可以处理这种异常,所以不需要额外处理
data_.push(std::move(new_value));
}
std::shared_ptr<T> pop() {
std::lock_guard lock(m_);
if (data_.empty()) throw empty_stack();
// 智能指针的创建也可能抛出异常,如内存不足
// 标准库可以处理这种异常,所以不需要额外处理
// 这里是移动了栈顶元素
// 如果发生异常的话,那么状态的恢复责任在移动构造函数上
// 所以不需要额外处理
const std::shared_ptr res = std::make_shared<T>(std::move(data_.top()));
// 底层使用 std::vector 实现,所以 pop 不会抛出异常
data_.pop();
return res;
}
void pop(T& value) {
std::lock_guard lock(m_);
if (data_.empty()) throw empty_stack();
// std::move 用于将左值转换为右值引用,这样可以避免拷贝
value = std::move(data_.top());
data_.pop();
}
bool empty() const {
std::lock_guard lock(m_);
return data_.empty();
}
private:
std::stack<T> data_;
// mutable 修饰的成员变量可以在 const 成员函数中修改
mutable std::mutex m_;
};
#endif // THREADSAFE_STACK_HPP
线程安全的队列
#ifndef THREADSAFE_QUEUE_HPP
#define THREADSAFE_QUEUE_HPP
#include <condition_variable>
#include <queue>
template <typename T>
class threadsafe_queue {
public:
threadsafe_queue() = default;
void wait_and_pop(T& value) {
std::unique_lock lock(m_);
// 等待队列不为空
cv_.wait(lock, [this] { return !queue_.empty(); });
value = std::move(*queue_.front());
queue_.pop();
}
bool try_pop(T& value) {
std::lock_guard lock(m_);
if (queue_.empty()) return false;
value = std::move(*queue_.front());
queue_.pop();
return true;
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock lock(m_);
cv_.wait(lock, [this] { return !queue_.empty(); });
std::shared_ptr<T> res = queue_.front();
queue_.pop();
return res;
}
void push(T new_value) {
// 由于 shared_ptr 的构造函数并不需要跨线程,所以可以在加锁之前构造 shared_ptr
// 内存分配是非常耗时的,所以尽量减少加锁期间的内存分配
std::shared_ptr<T> data = std::make_shared<T>(std::move(new_value));
std::lock_guard lock(m_);
queue_.push(data);
cv_.notify_one();
}
bool empty() const {
std::lock_guard lock(m_);
return queue_.empty();
}
private:
// 使用 std::shared_ptr<T> 作为队列元素,可以避免拷贝构造函数
std::queue<std::shared_ptr<T>> queue_;
mutable std::condition_variable cv_;
mutable std::mutex m_;
};
#endif // THREADSAFE_QUEUE_HPP
使用更小粒度互斥的线程安全队列
#ifndef THREADSAFE_RAW_QUEUE_HPP
#define THREADSAFE_RAW_QUEUE_HPP
#include <condition_variable>
#include <memory>
#include <mutex>
template <typename T>
class threadsafe_raw_queue {
private:
struct node {
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
public:
// 这里创建了一个虚位节点作为头结点,并且 tail_ 指向这个虚位节点
// 这样可以避免空队列的特殊处理,并且可以支持更细粒度的锁
//
// 如果不使用虚位节点,那么在 push 的时候,如果队列为空那么头尾指针都应该修改成新节点
// 但是如果使用虚位节点,那么直接修改尾指针就可以了
// 这样做到了数据的分离
threadsafe_raw_queue() :
head_(std::make_unique<node>()),
tail_(head_.get()) {
}
void push(T new_value) {
// 这里创建了一个新的虚位节点
std::shared_ptr<T> new_data = std::make_shared<T>(std::move(new_value));
std::unique_ptr<node> p = std::make_unique<node>(new node);
const node* new_tail = p.get();
{
std::lock_guard lock(tail_m_);
tail_->data = new_data;
tail_->next = std::move(p);
tail_ = new_tail;
}
data_cond_.notify_one();
}
std::shared_ptr<T> wait_and_pop() {
const std::unique_ptr<node> old_head = wait_pop_head();
return old_head->data;
}
void wait_and_pop(T& value) {
const std::unique_ptr<node> old_head = wait_pop_head(value);
}
std::shared_ptr<T> try_pop() {
const std::unique_ptr<node> old_head = try_pop_head();
return old_head ? old_head->data : nullptr;
}
bool try_pop(T& value) {
const std::unique_ptr<node> old_head = try_pop_head(value);
return old_head != nullptr;
}
[[nodiscard]] bool empty() const {
std::lock_guard lock(head_m_);
return head_.get() == get_tail();
}
private:
node* get_tail() {
std::lock_guard lock(tail_m_);
return tail_;
}
std::unique_ptr<node> pop_head_no_threadsafe() {
std::unique_ptr<node> old_head = std::move(head_);
head_ = std::move(old_head->next);
return old_head;
}
std::unique_lock<std::mutex> wait_for_data() {
// 等待队列不为空
std::unique_lock lock(head_m_);
data_cond_.wait(lock, [this] {
// 其实和下面的做法是等价的:
// 先对 head_ 和 tail_ 加锁,
// 如果 head_ 和 tail_ 指向同一个节点,那么就说明队列为空,释放这两个锁并等待
// 否则说明队列不为空,继续执行
return head_.get() != get_tail();
});
return lock;
}
std::unique_ptr<node> wait_pop_head() {
// 最大的问题是:如果队列为空,那么 head_ 和 tail_ 会指向同一个节点,可能会出现问题
// 但是这里巧妙地规避了这个问题
// 当队列为空时,这个函数是可能访问 dummy 节点的函数,所以:
// - 它与修改 head_ 的函数是互斥的
// - 它与修改 tail_ 的函数是互斥的
// - 它与修改 head_ 和 tail_ 的函数是互斥的
// 所以函数内部对 head_ 和 tail_ 都加了锁
std::unique_lock lock = wait_for_data();
// 这里是没有问题的,因为这里保证了队列不为空,此时 head_ 和 tail_ 不会指向同一个节点
return pop_head_no_threadsafe();
}
std::unique_ptr<node> wait_pop_head(T& value) {
std::unique_lock lock = wait_for_data();
// 这里先拿到数据,然后再 pop,pop 得到的是一个空节点
value = std::move(*head_->data);
return pop_head_no_threadsafe();
}
std::unique_ptr<node> try_pop_head() {
std::lock_guard lock(head_m_);
// 这里 get_tail 会对 tail_m_ 加锁
// 先对 head_ 加锁,然后再对 tail_ 加锁
// 没有反着加锁的方法,所以不会发生死锁
if (head_.get() == get_tail()) return nullptr;
return pop_head_no_threadsafe();
}
std::unique_ptr<node> try_pop_head(T& value) {
std::lock_guard lock(head_m_);
// 这里和上面的 wait_for_data 一样,也是可能访问 dummy 节点,我们需要对 head_ 和 tail_ 都加锁
if (head_.get() == get_tail()) return nullptr;
value = std::move(*head_->data);
return pop_head_no_threadsafe();
}
private:
// 更细粒度的锁,可以减少锁的竞争
std::mutex head_m_;
std::mutex tail_m_;
std::unique_ptr<node> head_;
// 注意 tail_ 不能是智能指针
node* tail_;
std::condition_variable data_cond_;
};
#endif // THREADSAFE_RAW_QUEUE_HPP
线程安全的查找表
#ifndef THREADSAFE_LOOKUP_TABLE_HPP
#define THREADSAFE_LOOKUP_TABLE_HPP
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
template <typename Key, typename Value, typename Hash = std::hash<Key>>
class threadsafe_lookup_table {
private:
class bucket_type {
public:
using bucket_value = std::pair<Key, Value>;
using bucket_data = std::list<bucket_value>;
using bucket_iterator = typename bucket_data::iterator;
Value value_for(const Key& key, const Value& default_value) const {
// 获取共享锁,因为只需要读
std::shared_lock lock(mutex_);
const bucket_iterator found_entry = find_entry_for(key);
return found_entry == data_.end() ? default_value : found_entry->second;
}
void add_or_update_mapping(const Key& key, const Value& value) {
// 获取独占锁,因为需要写
std::unique_lock lock(mutex_);
const bucket_iterator found_entry = find_entry_for(key);
if (found_entry == data_.end()) {
data_.push_back(bucket_value(key, value));
} else {
found_entry->second = value;
}
}
void remove_mapping(const Key& key) {
// 获取独占锁,因为需要写
std::unique_lock lock(mutex_);
const bucket_iterator found_entry = find_entry_for(key);
if (found_entry != data_.end()) {
data_.erase(found_entry);
}
}
private:
bucket_iterator find_entry_for(const Key& key) {
// 不加锁
return std::find_if(data_.begin(), data_.end(), [&](const bucket_value& item) {
return item.first == key;
});
}
private:
bucket_data data_;
//
mutable std::shared_mutex mutex_;
};
public:
using key_type = Key;
using mapped_type = Value;
using hash_type = Hash;
// 由于在构造函数中初始化了 buckets_,并且这个 vector 不会变,所以不需要加锁
explicit threadsafe_lookup_table(unsigned num_buckets = 19, const Hash& hasher = Hash()) :
buckets_(num_buckets), hasher_(hasher) {
for (unsigned i = 0; i < num_buckets; ++i) {
buckets_[i].reset(new bucket_type);
}
}
threadsafe_lookup_table(const threadsafe_lookup_table&) = delete;
threadsafe_lookup_table& operator=(const threadsafe_lookup_table&) = delete;
Value value_for(const Key& key, const Value& default_value) const {
return get_bucket(key).value_for(key, default_value);
}
void add_or_update_mapping(const Key& key, const Value& value) {
get_bucket(key).add_or_update_mapping(key, value);
}
void remove_mapping(const Key& key) {
get_bucket(key).remove_mapping(key);
}
std::map<Key, Value> get_map() const {
// 对所有的互斥加锁
// 每次按照相同的顺序(从小到大)加锁,可以避免死锁
// 这里加的是独占锁,应该可以改成共享锁
std::vector<std::unique_lock<std::shared_mutex>> locks;
for (auto& bucket : buckets_) {
locks.push_back(std::unique_lock<std::shared_mutex>(bucket->mutex_));
}
std::map<Key, Value> res;
for (auto& bucket : buckets_) {
for (const auto& item : bucket->data_) {
res.insert(item);
}
}
return res;
}
private:
bucket_type& get_bucket(const Key& key) {
const std::size_t bucket_index = hasher_(key) % buckets_.size();
return *buckets_[bucket_index];
}
private:
std::vector<std::unique_ptr<bucket_type>> buckets_;
Hash hasher_;
};
#endif // THREADSAFE_LOOKUP_TABLE_HPP
基于多种锁的线程安全链表
#ifndef THREADSAFE_LIST_HPP
#define THREADSAFE_LIST_HPP
#include <functional>
#include <memory>
#include <mutex>
template <typename T>
class threadsafe_list {
private:
struct node {
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node() : next() {
}
explicit node(const T& value) : data(std::make_shared<T>(value)) {
}
};
public:
threadsafe_list() = default;
threadsafe_list(const threadsafe_list&) = delete;
threadsafe_list& operator=(const threadsafe_list&) = delete;
void push_front(const T& value) {
std::unique_ptr<node> new_node = std::make_unique<node>(value);
// 由于只需要修改 head_,所以只需要对 head_ 加锁
std::lock_guard lock(head_.m);
new_node->next = std::move(head_.next);
head_.next = std::move(new_node);
}
void for_each(std::function<void(T&)> f) {
node* current = &head_;
// 先对 current(head) 加锁
std::unique_lock lock(head_.m);
while (node* const next = current->next.get()) {
std::unique_lock next_lock(next->m);
// 由于已经从 current 拿到了 next,所以可以释放 current 的锁
lock.unlock();
f(*next->data);
current = next;
// 对 next 加锁,这时不持有锁,所以不会发生死锁
lock = std::move(next_lock);
}
}
std::shared_ptr<T> find_first_if(std::function<bool(const T&)> f) {
node* current = &head_;
std::unique_lock lock(head_.m);
while (node* const next = current->next.get()) {
std::unique_lock next_lock(next->m);
lock.unlock();
if (f(*next->data)) {
return next->data;
}
current = next;
lock = std::move(next_lock);
}
return nullptr;
}
void remove_if(std::function<bool(const T&)> f) {
node* current = &head_;
std::unique_lock lock(head_.m);
while (node* const next = current->next.get()) {
std::unique_lock next_lock(next->m);
if (f(*next->data)) {
std::unique_ptr<node> old_next = std::move(current->next);
current->next = std::move(next->next);
} else {
lock.unlock();
current = next;
lock = std::move(next_lock);
}
}
}
private:
node head_;
};
#endif // THREADSAFE_LIST_HPP