【并发编程】多线程程序同步策略

2019-08-13 08:31:29来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

目录

  • C++11线程使用初探
  • 采用条件变量等待某个事件或条件发生
  • 线程安全的队列适配器

C++11线程使用初探

std::thread

#include <thread>

只读的共享数据在多个线程间不存在Race condition的危险,而可读可写共享数据在线程间共享时则需做好线程同步,即数据保护,主要包括lock-based和lock-free策略。

常见的以互斥锁保护多线程间的共享数据,保证某一时刻仅有一个线程访问共享数据,导致线程间数据保护是串行,因此在多线程环境中,锁保护的区域越小,并发程度越高。

采用条件变量等待某个事件或条件发生

C++11提供了两种条件变量:std::condition_variable和std::condition_variable_any,均需要和互斥量一起使用来保证操作的同步性。前者仅能与std::mutex一起使用,后者可与所有mutex-like的锁一起使用,更加通用,但以牺牲空间、性能或操作系统资源为代价,因此std::condition_variable是首选;它们均在头文件中定义。

生产者-消费者模式在并发编程中应用广泛,有助于系统的解耦。队列是一种常见的在生产者和消费者线程间传递数据的容器,队列先入先出的特性满足应用对顺序性的要求。

人脸分析组件采用队列传递数据,通过在生产者线程中调用InputData函数将待分析数据包送入队列,并调用notify_one通知消费者线程,从而消费者线程进行人脸数据分析。伪代码如下:

std::mutex mut;
std::condition_variable cond;
std::queue<data_chunk> data_queue;

// 将待分析数据送入队列
int InputData(const data_chunk& data)
{
  if (invalid_data)
  {
    LOG_ERROR("invalid param!");
    return ERROR_CODE;
  }
  // 将数据送入队列
  // 并发出通知
  std::lock_guard<std::mutex> lk(mut);
  data_queue.push(data);
  cond.notify_one();
  return SUCCESS_CODE;
}

// InputData在生产者线程中被调用,将待分析数据送入队列
// 消费线程函数Process从队列取数据以进行分析
// 线程在条件未发生时因wait函数阻塞进入睡眠状态
void Process()
{
  while (not_exit_expression)
  {
    // 使用unique_lock而非lock_guard
    std::unique_lock<std::mutex> lk(mut);

    // wait函数在条件满足时返回,否则线程进入阻塞状态
    //
    // 若lambda表达式返回false(即不条件满足),则wait释放lk中锁资源,
    // 且线程进入阻塞状态,以便生线程可以继续获取锁并送数据到队里;否则,
    //
    // 当notify_one通知条件变量时,消费消除从睡眠状态苏醒,重新获取锁,且对条件再次检查,lambda表达式返回true
    // 此时,wait返回并继续持有锁资源,然后继续往下执行
    cond.wait(lk, [](){ return !data_queue.empty(); });

    data_chunk data = data_queue.front();
    data_queue.pop();

    // wait返回后持有锁,因此需要在此处解锁
    lk.unlock();

    // 处理数据
    process_the_data;
  }
}

?? 若将Process循环中阻塞逻辑(wait函数)换为非阻塞模式逻辑——即当队列为空时,循环continue,将如何影响Process线程?

以上代码,消费者线程使用std::unique_lock而非std::lock_guard是因为前者较后者灵活,std::lock_guard未实现lock/unlock成员函数。另外,队列在线程间传递数据应用广泛,将非线程安全的队列和条件变量封装到具体场景类,不仅编码重复,而且低效易出错。因此,实现线程安全的队列是十分必要的。

线程安全的队列适配器

上小节已介绍了“队列,条件变量和互斥量”的应用场景,现将它们封装为线程安全的队列适配器——CTSQueue。

线程安全的队列 - 需求分析:

  • 仍具备队列的先入先出特性;
  • 支持数据队尾入队列、队首出队列操作;
  • 提供阻塞和非阻塞版本出队列操作;
  • 泛型队列;
  • 使用C++11容器及线程同步原语,当然也可使用其他如posix提供的同步原语;

完整代码如下:

#include <queue>              // for queue
#include <memory>             // for std::shared_ptr
#include <mutex>              // for std::mutex
#include <condition_variable> // for std::condition_variable

// 线程安全的队列
template<typename T>
class CTSQueue
{
public:
  CTSQueue() = default;
  ~CTSQueue()
  { }

  // 不提供复制操作 - 禁用
  CTSQueue(const CTSQueue&) = delete;
  CTSQueue& operator=(const CTSQueue&) = delete;

  // 入队列操作
  void Push(T data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    data_queue_.push(data);
    cond_.notify_one();
  }

  // 出队列操作 - 非阻塞版本
  bool TryPop(T& data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    // 空队列立马返回出队列失败,否则
    // 数据出队列(拷贝版本)
    if (data_queue_.empty())
    {
      return false;
    }
    data = data_queue_.front();
    data_queue_.pop();
    return true;
  }
  std::shared_ptr<T> TryPop()
  {
    std::lock_guard<std::mutex> lk(mut_);
    if (data_queue_.empty())
    {
      return std::shared_ptr<T>();
    }
    // 出队列(非拷贝版本)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  }

  // 出队列操作 - 阻塞版本
  void WaitPop(T& data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    // 条件未发生时,调用线程阻塞
    // 否则,数据出队列(拷贝版本)
    cond_.wait(lk, [this](){ return !data_queue_.empty() });
    data = data_queue_.front();
    data_queue_.pop();
  }
  std::shared_ptr<T> WaitPop()
  {
    std::lock_guard<std::mutex> lk(mut_);
    cond_.wait(lk, [this](){ return !data_queue_.empty() });
    // 出队列(非拷贝版本)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  }

  // 查询队列状态操作
  bool Empty() const
  {
    std::lock_guard<std::mutex> lk(mut_);
    return data_queue_.empty();
  }

private:
  mutable std::mutex      mut_;
  std::queue<T>           data_queue_;
  std::condition_variable cond_;
};

参考文献:
[1] 《C++ Concurrency In Action》, https://www.bogotobogo.com/cplusplus/files/CplusplusConcurrencyInAction_PracticalMultithreading.pdf


原文链接:https://www.cnblogs.com/bigosprite/p/11332389.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:Larave中CSRF攻击

下一篇:PHP中如何将文件缓存转内存缓存