歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> C++並發編程 等待與喚醒

C++並發編程 等待與喚醒

日期:2017/3/1 9:08:33   编辑:Linux編程

C++並發編程 等待與喚醒

條件變量

條件變量, 包括(std::condition_variable 和 std::condition_variable_any)
  定義在 condition_variable 頭文件中, 它們都需要與互斥量(作為同步工具)一起才能工作.

  std::condition_variable 允許阻塞一個線程, 直到條件達成.

成員函數

void wait(std::unique_lock<std::mutex>& lock);
等待, 通過 notify_one(), notify_all()或偽喚醒結束等待
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
等待, 通過 notify_one(), notify_all()被調用, 並且謂詞為 true 時結束等待.
pred 謂詞必須是合法的, 並且需要返回一個值, 這個值可以和bool互相轉化
cv_status wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& absolute_time);
調用 notify_one(), notify_all(), 超時或線程偽喚醒時, 結束等待.
返回值標識了是否超時.
bool wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& absolute_time, Predicate pred);
等待, 通過 notify_one(), notify_all(), 超時, 線程偽喚醒, 並且謂詞為 true 時結束等待.
cv_status wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& relative_time);
調用 notify_one(), notify_all(), 指定時間內達成條件或線程偽喚醒時,結束等待
bool wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& relative_time, Predicate pred);
調用 notify_one(), notify_all(), 指定時間內達成條件或線程偽喚醒時,並且謂詞為 true 時結束等待.

void notify_one() noexcept; 喚醒一個等待當前 std::condition_variable 實例的線程
void notify_all() noexcept; 喚醒所有等待當前 std::condition_variable 實例的線程

一個線程安全的隊列設計:

    #ifndef _THREAD_SAFE_QUEUE_
    #define _THREAD_SAFE_QUEUE_

    #include <condition_variable>
    #include <mutex>
    #include <queue>
    #include <memory>

    template<typename Ty, typename ConditionVar = std::condition_variable, typename Mutex = std::mutex>
    class ThreadSafeQueue
    {
        typedef std::queue<Ty>                Queue;
        typedef std::shared_ptr<Ty>           SharedPtr;
        typedef std::lock_guard<Mutex>        MutexLockGuard;
        typedef std::unique_lock<Mutex>       MutexUniqueLock;

    public:
        explicit ThreadSafeQueue() {}
        ~ThreadSafeQueue() {}

        ThreadSafeQueue(const ThreadSafeQueue&) = delete;
        ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;

        bool IsEmpty() const
        {
            MutexLockGuard lk(mMutex);
            return mQueue.empty();
        }

        void WaitAndPop(Ty& value)
        {
            MutexUniqueLock lk(mMutex);
            mConVar.wait(lk, [this] {
                return !mQueue.empty(); 
            });
            value = mQueue.front();
            mQueue.pop();
        }

        SharedPtr WaitAndPop()
        {
            MutexUniqueLock lk(mMutex);
            mConVar.wait(lk, [this] {
                return !mQueue.empty();
            });
            SharedPtr sp = std::make_shared<Ty>(mQueue.front());
            mQueue.pop();
            return sp;
        }

        bool TryPop(Ty& value)
        {
            MutexLockGuard lk(mMutex);
            if (mQueue.empty())
                return false;
            value = mQueue.front();
            mQueue.pop();
            return true;
        }

        SharedPtr TryPop()
        {
            MutexLockGuard lk(mMutex);
            if (mQueue.empty())
                return false;
            SharedPtr sp = std::make_shared<Ty>(mQueue.front());
            mQueue.pop();
            return sp;
        }

        void Push(const Ty& value)
        {
            MutexLockGuard lk(mMutex);
            mQueue.push(value);
            mConVar.notify_all();
        }

    private:
        mutable Mutex             mMutex;
        ConditionVar              mConVar;
        Queue                     mQueue;
    };

    #endif // _THREAD_SAFE_QUEUE_

另一個版本, 使用 shared_ptr 作為成員對隊列的性能有很大的提升, 其在push時減少了互斥量持有的時間, 允許其它線程在分配內存的同時,對隊列進行其它操作.

template<typename Ty, typename ConditionVar = std::condition_variable, typename Mutex = std::mutex>
class ThreadSafeQueue
{
    typedef std::shared_ptr<Ty>               SharedPtr;
    typedef std::queue<SharedPtr>             Queue;
    typedef std::shared_ptr<Ty>               SharedPtr;
    typedef std::lock_guard<Mutex>            MutexLockGuard;
    typedef std::unique_lock<Mutex>           MutexUniqueLock;

public:
    explicit ThreadSafeQueue() {}
    ~ThreadSafeQueue() {}

    ThreadSafeQueue(const ThreadSafeQueue&) = delete;
    ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;

    bool IsEmpty() const
    {
        MutexLockGuard lk(mMutex);
        return mQueue.empty();
    }

    void WaitAndPop(Ty& value)
    {
        MutexUniqueLock lk(mMutex);
        mConVar.wait(lk, [this] {
            return !mQueue.empty();
        });
        value = std::move(*mQueue.front());
        mQueue.pop();
    }

    SharedPtr WaitAndPop()
    {
        MutexUniqueLock lk(mMutex);
        mConVar.wait(lk, [this] {
            return !mQueue.empty();
        });
        SharedPtr sp = mQueue.front();
        mQueue.pop();
        return sp;
    }

    bool TryPop(Ty& value)
    {
        MutexLockGuard lk(mMutex);
        if (mQueue.empty())
            return false;
        value = std::move(*mQueue.front());
        mQueue.pop();
        return true;
    }

    SharedPtr TryPop()
    {
        MutexLockGuard lk(mMutex);
        if (mQueue.empty())
            return false;
        SharedPtr sp = mQueue.front();
        mQueue.pop();
        return sp;
    }

    void Push(const Ty& value)
    {
        SharedPtr p = std::make_shared<Ty>(value);
        MutexLockGuard lk(mMutex);
        mQueue.push(p);
        mConVar.notify_all();
    }

private:
    mutable Mutex             mMutex;
    ConditionVar              mConVar;
    Queue                     mQueue;
}; 

std::future

期望(std::future)可以用來等待其他線程上的異步結果, 其實例可以在任意時間引用異步結果.
C++包括兩種期望, std::future(唯一期望) 和 std::shared_future(共享期望)
std::future 的實例只能與一個指定事件相關聯.
std::shared_future 的實例能關聯多個事件, 它們同時變為就緒狀態, 並且可以訪問與事件相關的任何數據.
在與數據無關的地方,可以使用 std::future<void> 與 std::shared_future<void> 的特化模板.
期望對象本身並不提供同步訪問, 如果多個線程要訪問一個獨立的期望對象, 需要使用互斥體進行保護.

std::packaged_task

std::packaged_task 可包裝一個函數或可調用的對象, 並且允許異步獲取該可調用對象產生的結果, 返回值通過 get_future 返回的 std::future 對象取得, 其返回的 std::future 的模板類型為 std::packaged_task 模板函數簽名中的返回值類型.
std::packaged_task 對象被調用時, 就會調用相應的函數或可調用對象, 將期望置為就緒, 並存儲返回值.
std::packaged_task 的模板參數是一個函數簽名, 如 int(std::string&, double*), 構造 std::packaged_task 實例時必須傳入一個可以匹配的函數或可調用對象, 也可以是隱藏轉換能匹配的.

    std::packaged_task<std::string(const std::string&)> task([](std::string str) {
        std::stringstream stm;
        stm << "tid:" << std::this_thread::get_id() << ", str:" << str << std::endl;
        std::cout << stm.str();
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return std::string("MSG:Hello");
    });
    std::future<std::string> f = task.get_future();
    std::thread t(std::move(task), std::string("package task test"));
    t.detach();
    // 調用 f.get 返回結果, 但是須阻塞等到任務執行完成
    std::cout << "main tid:" << std::this_thread::get_id() << ", result: " << f.get() << std::endl; 

std::promise

std::promise 類型模板提供設置異步結果的方法, 這樣其他線程就可以通過 std::future 實例來索引該結果.

    class SquareRoot
    {
        std::promise<double>& prom;
    public: 
        SquareRoot(std::promise<double>& p) : prom(p) {}
        ~SquareRoot() {}
        void operator()(double x)
        {
            if (x < 0)
            {
                prom.set_exception(std::make_exception_ptr(std::out_of_range("x<0")));
            }
            else
            {
                double result = std::sqrt(x);
                prom.set_value(result);
            }
        }
    };

    std::promise<double> prom;
    SquareRoot p(prom);
    std::thread t(std::bind(&SquareRoot::operator(), &p, 1));
    //std::thread t(std::bind(&SquareRoot::operator(), &p, -1));
    std::future<double> f = prom.get_future();
    try
    {
        double v = f.get();
        std::cout << "value:" << v << std::endl;
    }
    catch (std::exception& e)
    {
        std::cout << "exception:" << e.what() << std::endl;
    }
    t.join();

Copyright © Linux教程網 All Rights Reserved