《C++ 并发编程实战》基于锁的四个线程安全数据结构

线程安全的栈

#ifndef THREADSAFE_STACK_HPP
#define THREADSAFE_STACK_HPP

#include <exception>
#include <memory>
#include <mutex>
#include <stack>
#include <vector>

struct empty_stack final : std::exception {
    [[nodiscard]] const char* what() const noexcept override {
        return "empty stack";
    }
};

template <typename T>
class threadsafe_stack {
public:
    threadsafe_stack() = default;

    threadsafe_stack(const threadsafe_stack& other) {
        std::lock_guard lock(other.m_);
        data_ = other.data_;
    }

    threadsafe_stack& operator=(const threadsafe_stack&) = delete;

    // 这里使用值传递,调用者可以使用 std::move 来避免拷贝构造函数
    void push(T new_value) {
        // 加锁可能抛出异常,但是即使抛出异常,也不会有问题,因为并没有修改数据
        std::lock_guard lock(m_);

        // push 操作也有可能抛出异常,例如在拷贝、移动过程中,或者在扩展容器的过程中
        // 但是标准库容器可以处理这种异常,所以不需要额外处理
        data_.push(std::move(new_value));
    }

    std::shared_ptr<T> pop() {
        std::lock_guard lock(m_);
        if (data_.empty()) throw empty_stack();

        // 智能指针的创建也可能抛出异常,如内存不足
        // 标准库可以处理这种异常,所以不需要额外处理

        // 这里是移动了栈顶元素
        // 如果发生异常的话,那么状态的恢复责任在移动构造函数上
        // 所以不需要额外处理
        const std::shared_ptr res = std::make_shared<T>(std::move(data_.top()));

        // 底层使用 std::vector 实现,所以 pop 不会抛出异常
        data_.pop();
        return res;
    }

    void pop(T& value) {
        std::lock_guard lock(m_);
        if (data_.empty()) throw empty_stack();

        // std::move 用于将左值转换为右值引用,这样可以避免拷贝
        value = std::move(data_.top());

        data_.pop();
    }

    bool empty() const {
        std::lock_guard lock(m_);
        return data_.empty();
    }

private:
    std::stack<T> data_;

    // mutable 修饰的成员变量可以在 const 成员函数中修改
    mutable std::mutex m_;
};

#endif // THREADSAFE_STACK_HPP

线程安全的队列

#ifndef THREADSAFE_QUEUE_HPP
#define THREADSAFE_QUEUE_HPP

#include <condition_variable>
#include <queue>


template <typename T>
class threadsafe_queue {
public:
    threadsafe_queue() = default;

    void wait_and_pop(T& value) {
        std::unique_lock lock(m_);

        // 等待队列不为空
        cv_.wait(lock, [this] { return !queue_.empty(); });
        value = std::move(*queue_.front());
        queue_.pop();
    }

    bool try_pop(T& value) {
        std::lock_guard lock(m_);
        if (queue_.empty()) return false;
        value = std::move(*queue_.front());
        queue_.pop();
        return true;
    }

    std::shared_ptr<T> wait_and_pop() {
        std::unique_lock lock(m_);
        cv_.wait(lock, [this] { return !queue_.empty(); });
        std::shared_ptr<T> res = queue_.front();
        queue_.pop();
        return res;
    }

    void push(T new_value) {
        // 由于 shared_ptr 的构造函数并不需要跨线程,所以可以在加锁之前构造 shared_ptr
        // 内存分配是非常耗时的,所以尽量减少加锁期间的内存分配
        std::shared_ptr<T> data = std::make_shared<T>(std::move(new_value));
        std::lock_guard lock(m_);
        queue_.push(data);
        cv_.notify_one();
    }

    bool empty() const {
        std::lock_guard lock(m_);
        return queue_.empty();
    }

private:
    // 使用 std::shared_ptr<T> 作为队列元素,可以避免拷贝构造函数
    std::queue<std::shared_ptr<T>> queue_;
    mutable std::condition_variable cv_;
    mutable std::mutex m_;
};

#endif // THREADSAFE_QUEUE_HPP

使用更小粒度互斥的线程安全队列

#ifndef THREADSAFE_RAW_QUEUE_HPP
#define THREADSAFE_RAW_QUEUE_HPP

#include <condition_variable>
#include <memory>
#include <mutex>

template <typename T>
class threadsafe_raw_queue {
private:
    struct node {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };

public:
    // 这里创建了一个虚位节点作为头结点,并且 tail_ 指向这个虚位节点
    // 这样可以避免空队列的特殊处理,并且可以支持更细粒度的锁
    //
    // 如果不使用虚位节点,那么在 push 的时候,如果队列为空那么头尾指针都应该修改成新节点
    // 但是如果使用虚位节点,那么直接修改尾指针就可以了
    // 这样做到了数据的分离
    threadsafe_raw_queue() :
        head_(std::make_unique<node>()),
        tail_(head_.get()) {
    }

    void push(T new_value) {
        // 这里创建了一个新的虚位节点
        std::shared_ptr<T> new_data = std::make_shared<T>(std::move(new_value));
        std::unique_ptr<node> p = std::make_unique<node>(new node);
        const node* new_tail = p.get();

        {
            std::lock_guard lock(tail_m_);
            tail_->data = new_data;
            tail_->next = std::move(p);
            tail_ = new_tail;
        }

        data_cond_.notify_one();
    }

    std::shared_ptr<T> wait_and_pop() {
        const std::unique_ptr<node> old_head = wait_pop_head();
        return old_head->data;
    }

    void wait_and_pop(T& value) {
        const std::unique_ptr<node> old_head = wait_pop_head(value);
    }

    std::shared_ptr<T> try_pop() {
        const std::unique_ptr<node> old_head = try_pop_head();
        return old_head ? old_head->data : nullptr;
    }

    bool try_pop(T& value) {
        const std::unique_ptr<node> old_head = try_pop_head(value);
        return old_head != nullptr;
    }

    [[nodiscard]] bool empty() const {
        std::lock_guard lock(head_m_);
        return head_.get() == get_tail();
    }

private:
    node* get_tail() {
        std::lock_guard lock(tail_m_);
        return tail_;
    }

    std::unique_ptr<node> pop_head_no_threadsafe() {
        std::unique_ptr<node> old_head = std::move(head_);
        head_ = std::move(old_head->next);
        return old_head;
    }

    std::unique_lock<std::mutex> wait_for_data() {
        // 等待队列不为空
        std::unique_lock lock(head_m_);
        data_cond_.wait(lock, [this] {
            // 其实和下面的做法是等价的:
            // 先对 head_ 和 tail_ 加锁,
            // 如果 head_ 和 tail_ 指向同一个节点,那么就说明队列为空,释放这两个锁并等待
            // 否则说明队列不为空,继续执行
            return head_.get() != get_tail();
        });
        return lock;
    }

    std::unique_ptr<node> wait_pop_head() {
        // 最大的问题是:如果队列为空,那么 head_ 和 tail_ 会指向同一个节点,可能会出现问题
        // 但是这里巧妙地规避了这个问题

        // 当队列为空时,这个函数是可能访问 dummy 节点的函数,所以:
        // - 它与修改 head_ 的函数是互斥的
        // - 它与修改 tail_ 的函数是互斥的
        // - 它与修改 head_ 和 tail_ 的函数是互斥的
        // 所以函数内部对 head_ 和 tail_ 都加了锁
        std::unique_lock lock = wait_for_data();

        // 这里是没有问题的,因为这里保证了队列不为空,此时 head_ 和 tail_ 不会指向同一个节点
        return pop_head_no_threadsafe();
    }

    std::unique_ptr<node> wait_pop_head(T& value) {
        std::unique_lock lock = wait_for_data();

        // 这里先拿到数据,然后再 pop,pop 得到的是一个空节点
        value = std::move(*head_->data);
        return pop_head_no_threadsafe();
    }

    std::unique_ptr<node> try_pop_head() {
        std::lock_guard lock(head_m_);

        // 这里 get_tail 会对 tail_m_ 加锁
        // 先对 head_ 加锁,然后再对 tail_ 加锁
        // 没有反着加锁的方法,所以不会发生死锁
        if (head_.get() == get_tail()) return nullptr;
        return pop_head_no_threadsafe();
    }

    std::unique_ptr<node> try_pop_head(T& value) {
        std::lock_guard lock(head_m_);

        // 这里和上面的 wait_for_data 一样,也是可能访问 dummy 节点,我们需要对 head_ 和 tail_ 都加锁
        if (head_.get() == get_tail()) return nullptr;

        value = std::move(*head_->data);
        return pop_head_no_threadsafe();
    }

private:
    // 更细粒度的锁,可以减少锁的竞争
    std::mutex head_m_;
    std::mutex tail_m_;

    std::unique_ptr<node> head_;

    // 注意 tail_ 不能是智能指针
    node* tail_;

    std::condition_variable data_cond_;
};

#endif // THREADSAFE_RAW_QUEUE_HPP

线程安全的查找表

#ifndef THREADSAFE_LOOKUP_TABLE_HPP
#define THREADSAFE_LOOKUP_TABLE_HPP
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>

template <typename Key, typename Value, typename Hash = std::hash<Key>>
class threadsafe_lookup_table {
private:
    class bucket_type {
    public:
        using bucket_value = std::pair<Key, Value>;
        using bucket_data = std::list<bucket_value>;
        using bucket_iterator = typename bucket_data::iterator;

        Value value_for(const Key& key, const Value& default_value) const {
            // 获取共享锁,因为只需要读
            std::shared_lock lock(mutex_);
            const bucket_iterator found_entry = find_entry_for(key);
            return found_entry == data_.end() ? default_value : found_entry->second;
        }

        void add_or_update_mapping(const Key& key, const Value& value) {
            // 获取独占锁,因为需要写
            std::unique_lock lock(mutex_);
            const bucket_iterator found_entry = find_entry_for(key);
            if (found_entry == data_.end()) {
                data_.push_back(bucket_value(key, value));
            } else {
                found_entry->second = value;
            }
        }

        void remove_mapping(const Key& key) {
            // 获取独占锁,因为需要写
            std::unique_lock lock(mutex_);
            const bucket_iterator found_entry = find_entry_for(key);
            if (found_entry != data_.end()) {
                data_.erase(found_entry);
            }
        }

    private:
        bucket_iterator find_entry_for(const Key& key) {
            // 不加锁
            return std::find_if(data_.begin(), data_.end(), [&](const bucket_value& item) {
                return item.first == key;
            });
        }

    private:
        bucket_data data_;

        //
        mutable std::shared_mutex mutex_;
    };

public:
    using key_type = Key;
    using mapped_type = Value;
    using hash_type = Hash;

    // 由于在构造函数中初始化了 buckets_,并且这个 vector 不会变,所以不需要加锁

    explicit threadsafe_lookup_table(unsigned num_buckets = 19, const Hash& hasher = Hash()) :
        buckets_(num_buckets), hasher_(hasher) {
        for (unsigned i = 0; i < num_buckets; ++i) {
            buckets_[i].reset(new bucket_type);
        }
    }

    threadsafe_lookup_table(const threadsafe_lookup_table&) = delete;
    threadsafe_lookup_table& operator=(const threadsafe_lookup_table&) = delete;

    Value value_for(const Key& key, const Value& default_value) const {
        return get_bucket(key).value_for(key, default_value);
    }

    void add_or_update_mapping(const Key& key, const Value& value) {
        get_bucket(key).add_or_update_mapping(key, value);
    }

    void remove_mapping(const Key& key) {
        get_bucket(key).remove_mapping(key);
    }

    std::map<Key, Value> get_map() const {
        // 对所有的互斥加锁
        // 每次按照相同的顺序(从小到大)加锁,可以避免死锁

        // 这里加的是独占锁,应该可以改成共享锁
        std::vector<std::unique_lock<std::shared_mutex>> locks;
        for (auto& bucket : buckets_) {
            locks.push_back(std::unique_lock<std::shared_mutex>(bucket->mutex_));
        }

        std::map<Key, Value> res;
        for (auto& bucket : buckets_) {
            for (const auto& item : bucket->data_) {
                res.insert(item);
            }
        }
        return res;
    }

private:
    bucket_type& get_bucket(const Key& key) {
        const std::size_t bucket_index = hasher_(key) % buckets_.size();
        return *buckets_[bucket_index];
    }

private:
    std::vector<std::unique_ptr<bucket_type>> buckets_;
    Hash hasher_;
};

#endif // THREADSAFE_LOOKUP_TABLE_HPP

基于多种锁的线程安全链表

#ifndef THREADSAFE_LIST_HPP
#define THREADSAFE_LIST_HPP

#include <functional>
#include <memory>
#include <mutex>

template <typename T>
class threadsafe_list {
private:
    struct node {
        std::mutex m;
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;

        node() : next() {
        }

        explicit node(const T& value) : data(std::make_shared<T>(value)) {
        }
    };

public:
    threadsafe_list() = default;
    threadsafe_list(const threadsafe_list&) = delete;
    threadsafe_list& operator=(const threadsafe_list&) = delete;

    void push_front(const T& value) {
        std::unique_ptr<node> new_node = std::make_unique<node>(value);

        // 由于只需要修改 head_,所以只需要对 head_ 加锁
        std::lock_guard lock(head_.m);
        new_node->next = std::move(head_.next);
        head_.next = std::move(new_node);
    }

    void for_each(std::function<void(T&)> f) {
        node* current = &head_;

        // 先对 current(head) 加锁
        std::unique_lock lock(head_.m);
        while (node* const next = current->next.get()) {
            std::unique_lock next_lock(next->m);

            // 由于已经从 current 拿到了 next,所以可以释放 current 的锁
            lock.unlock();

            f(*next->data);
            current = next;

            // 对 next 加锁,这时不持有锁,所以不会发生死锁
            lock = std::move(next_lock);
        }
    }

    std::shared_ptr<T> find_first_if(std::function<bool(const T&)> f) {
        node* current = &head_;
        std::unique_lock lock(head_.m);
        while (node* const next = current->next.get()) {
            std::unique_lock next_lock(next->m);
            lock.unlock();

            if (f(*next->data)) {
                return next->data;
            }

            current = next;
            lock = std::move(next_lock);
        }

        return nullptr;
    }

    void remove_if(std::function<bool(const T&)> f) {
        node* current = &head_;
        std::unique_lock lock(head_.m);
        while (node* const next = current->next.get()) {
            std::unique_lock next_lock(next->m);

            if (f(*next->data)) {
                std::unique_ptr<node> old_next = std::move(current->next);
                current->next = std::move(next->next);
            } else {
                lock.unlock();
                current = next;
                lock = std::move(next_lock);
            }
        }
    }

private:
    node head_;
};

#endif // THREADSAFE_LIST_HPP