目录
⛳️推荐
一、生产消费者模型
生产消费者模型(consumer producter
)简称 cp
,是多线程多进程下同步互斥的一种场景。
超市的本质就是一个大号的缓存,超市的存在提高了生产者和消费者的效率,超市支持忙闲不均,即生产者可能一次生产很多,但是消费的很慢。对于生产者来说,它最关心超市里还剩多少空位置,而消费者最关心超市的商品数量。超市的存在,让生产和消费行为进行一定成都的解藕。
将这个现实生活中的例子映射到计算机中:生产者和消费者都由不同的线程来承担,超市就是一个特定结构的内存空间(可以是队列、二叉树、链表、数组…),商品就是数据。所以生产消费者模型本质是执行流在做通信,但是现如今我们的关注点不再是如何进行通信,而是如何进行安全且高效的通信。超市需要被多个线程访问到,因此超市就属于一种共享资源。既然是共享资源,那么在多执行流进行并发访问的时候可能会出现线程安全问题。主要有以下三组并发问题:
-
生产者 VS 生产者:互斥(各个生产者之间是一种竞争关系)
-
消费者 VS 消费者:互斥(只剩一个商品了)
-
生产者 VS 消费者:互斥(生产者要么不生产,要已经生产好放在超市了,消费者要么拿到商品,要么没拿到商品,不存在生产者正在生产,消费者就来拿,也就是说在超市的商品不允许生产者和消费者同时去访问,生产者正在往超市放置商品的时候,消费者不能来拿,为了保证数据的安全,生产者和消费者之间一定需要是互斥关系);同步(如果生产者频繁的访问超市,给超市打电话,问超市需不需要货物,会导致消费者访问不到超市的数据,造成消费者饥饿。正确做法是,生产者生产一部分,然后让消费者来消费,生产和消费要有一定的顺序性,因此需要同步)
生产消费者模型总结:3 种关系;2 种角色;1 个交易场所。该模型的优点是支持忙闲不均,将生产和消费进行解藕。
生产者的数据,一般是从用户、网络中获取的,所以生产者生产的数据也是要花时间获取的,然后才是生产数据到队列。消费者,也不只是获取数据这么简单,消费者在拿到数据后大概率是要对数据做加工处理的,这也需要花时间。生产消费者模型就高效在这里,生产者在向仓库生产数据的时候,消费者虽然不能从仓库获取数据,但是消费者可以在这个时候去加工处理数据,同理,消费者在从队列里获取数据的时候,生产者虽然不能向仓库中生产数据,但是它可以在这个时候获取数据。因此,生产线程和消费线程可以并发的去执行,这就是生产消费者模型高效的点。如果只看上图中的中间部分,那么生产消费者模型其实并不高效,因为生产者放数据和消费者取数据并不能同时进行。
1.1 生产消费者模型的解藕特性
举个栗子,以前我们在 main
函数中调用 add
函数的时候,因为是单执行流,所以main
函数只能等待。假如采用生产消费者模型,让 main
函数是一个线程充当生产者, add
函数是另一个线程充当消费者,此时 main
函数这个线程在 add
线程进行计算的时候就不需要等待了,可以继续生产数据往超市里面放,add
线程现在也不用等 main
线程生产出一组数据再计算一组数据,而是直接去超市里面取数据进行计算。对 main
函数和 add
函数来说,此时就解藕了。
二、基于BlockingQueue的生产消费者模型
BlockQueue:在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通队列区别在于,当队列为空时,从队列获取元素的操作会被阻塞,直到队列中被放入的元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
2.1 单生产单消费模型
// BlockQueue.hpp
#pragma
#include <iostream>
#include <pthread.h>
#include <queue>
template <class T>
class BlockQueue
{
static const int defaultmaximum = 20;
public:
BlockQueue(int maximum = defaultmaximum)
: maximum_(maximum)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_cond_, nullptr);
pthread_cond_init(&p_cond_, nullptr);
low_water_ = maximum_ / 3; // 低水位线是队列最大容量的 1/3
high_water_ = (maximum_*2)/3; // 高水位线是队列最大容量的 2/3
}
T pop()
{
pthread_mutex_lock(&mutex_);
if (q_.size() == 0)
{ // 消费条件不满足,去等待
pthread_cond_wait(&c_cond_, &mutex_);
}
// 1. 队列没空 2. 被唤醒
T out = q_.front();
q_.pop();
if(q_.size() == low_water_) pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &data)
{
pthread_mutex_lock(&mutex_);
if (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
{
// 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
pthread_cond_wait(&p_cond_, &mutex_);
}
// 1、队列没满 2、被唤醒
q_.push(data); // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,
pthread_mutex_unlock(&mutex_);
if(q_.size() == high_water_) pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源
int maximum_; // 队列的最大容量
pthread_mutex_t mutex_; // 定义一个互斥量(锁)
pthread_cond_t c_cond_; // 定义一个消费者条件变量,消费者在这个条件变量下进行等待
pthread_cond_t p_cond_; // 定义一个生产者条件变量,生产者在这个条件变量下进行等待
int low_water_; // 队列的低水位线
int high_water_; // 队列的高水位线
};
// main.cc
#include "BlockQueue.hpp"
#include <unistd.h>
void *Consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
// 消费——从队列里面拿数据
int data = bq->pop();
std::cout << "消费了一个数据: " << data << std::endl;
usleep(1000000);
}
}
void *Productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
int data = 1;
while(true)
{
// 生产——往队列里面放数据
// int data = rand() % 10 + 1;
bq->push(data);
std::cout << "生产了一个数据:" << data << std::endl;
data++;
usleep(100000);
}
}
int main()
{
srand((unsigned int)time(nullptr));
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, bq);
pthread_create(&p, nullptr, Productor, bq);
// 主线程等待两个子线程退出
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq; // 释放资源
return 0;
}
注意事项:申请到锁之后,首先要检查资源是否就绪,就绪了才能进行后续操作,而资源是否就绪是通过判断得来的,判断本身就属于访问临界资源,所以判断应该在加锁之后,所以如果资源没有就绪就需要调用 pthread_cond_wait
让该线程去休眠,所以调用pthread_cond_wait
的时候,该线程一定是持有锁的,不能持有锁去等待,这样会造成死锁问题,因此在调用该函数时,该函数内部会释放锁,然后让进程去休眠,在进程被重新唤醒,也就是函数返回的时候,需要重新持有锁。
为什么结果中夹杂了消费一个数据?不应该是一批数据嘛?而且,队列的高水位线似乎没有起作用?因为,最初一定是生产者先生产13个数据,然后到了高水位线,此时去 c_cond_
中唤醒消费者线程,刚被唤醒的消费者线程先要去竞争锁,在生产者线程释放锁之前,这个被唤醒的消费者线程其实是处于挂起等待状态的,只不过等待的地方变了,本来是在 c_cond_
这个条件变量中进行等待条件满足,进入 c_cond_
是因为资源没有就绪,此时被唤醒,说明资源就绪了,所以 c_cond_
就给这个在其中等待的消费者线程说:“好了,资源已经就绪了,你出去等吧,别在我这里等了”。此时虽然资源就绪了,但是访问资源需要锁,但是该线程刚“出来”,它并没有锁,此时锁还在生产者线程手里,所以该消费者线程还是只能等待,紧接着生产者线程释放锁,此时注意了!!!生产者线程释放完锁之后,并没有去 p_cond_
中休眠,因为此时代码中去 p_cond_
中休眠的前提只有一个,就是资源不就绪,资源不就绪的前提是,你得先去申请锁,有了锁才能去判断资源是否就绪,所以,此时生产者线程和消费者线程,都在等着竞争这个锁,这一次,消费者线程运气比较好,竞争到了锁,它去消费了一个数据,然后把锁释放了,这个过程中,生产者线程并没有去 p_cond_
中进行休眠,而是一直在等待锁被释放,然后去竞争。这一次,生产者线程运气比较好,它竞争到了锁,它就又去生产数据了,这就是为什么在生产一批数据后,本应该消费一批数据的,但是却只消费了一个数据就继续去生产了,并且后面生成数据的个数明显已经超过了高水位线,但是还一直在生产,本质是因为,这段时间对生产者和消费者来说,资源始终都是就绪的,他们并没有去对应的条件变量下进行休眠,而是都一直处于竞争锁的状态,在仔细观察可以发现,在生产了13个数据之后,消费了1个,紧接着又生产了8个,此时队列已经被生产满了,然后对生产者线程来说,资源处于不就绪状态,所以此时生产者线程去 p_cond_
下休眠了,然后消费者线程就能安心来进行消费了。我们的高水位线和低水位线貌似并没有起作用,我们希望的是,当生产达到高水位线的时候,让生产者停止,然后让消费者去消费一批线程,达到一种同步的效果,但是我们的希望并没有实现。问题就出在,我们在唤醒一个生产者线程(消费者线程)的同时,消费者线程(生产者线程)也是处于唤醒状态,此时就会存在消费者线程和生产者线程同时去竞争锁,此时他们俩竞争锁的能力可能会不同,因此就达不到我们所希望的同步状态,怎么解决呢?其实也很简单,在唤醒一个线程的时候,让当前线程去休眠。此时就不会出现两个线程都处于被唤醒的状态,去竞争锁的情况,全程只会有一个线程处于被唤醒的状态。将代码修改如下:
T pop()
{
pthread_mutex_lock(&mutex_);
while (q_.size() == 0)
{ // 消费条件不满足,去等待
pthread_cond_wait(&c_cond_, &mutex_);
}
// 1. 队列没空 2. 被唤醒
T out = q_.front();
q_.pop();
if (q_.size() <= low_water_)
{
pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
pthread_cond_wait(&c_cond_, &mutex_); // 让当前进程去休眠
std::cout << "c is sleep..." << std::endl;
}
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &data)
{
pthread_mutex_lock(&mutex_);
while (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
{
// 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
pthread_cond_wait(&p_cond_, &mutex_);
}
// 1、队列没满 2、被唤醒
q_.push(data); // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,
if (q_.size() >= high_water_)
{
int ret = pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
pthread_cond_wait(&p_cond_, &mutex_);
std::cout << "p is sleep.." << std::endl;// 让当前进程去休眠
}
pthread_mutex_unlock(&mutex_);
// pthread_cond_wait(&p_cond_, &mutex_);
}
2.2 伪唤醒、误唤醒造成的问题
其次是唤醒操作,生产者生产一个数据后,它自己是最清楚的,此时队列里有数据了,可以唤醒一个消费者来取数据。消费者在队列中取数据时,如果没有了,它就可以唤醒一个生产者来进行生产。
T pop()
{
pthread_mutex_lock(&mutex_);
if (q_.size() == 0)
{ // 消费条件不满足,去等待
pthread_cond_wait(&c_cond_, &mutex_);
}
// 1. 队列没空 2. 被唤醒
T out = q_.front();
q_.pop();
if(q_.size() == low_water_) pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &data)
{
pthread_mutex_lock(&mutex_);
if (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
{
// 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
pthread_cond_wait(&p_cond_, &mutex_);
}
// 1、队列没满 2、被唤醒
q_.push(data); // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,
pthread_mutex_unlock(&mutex_);
if(q_.size() == high_water_) pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
}
伪唤醒和误唤醒是在多生产,多消费的前提下,假设此时队列里已经满了,然后消费者消费了一个数据,此时队列空出来一个位置,紧接着,这个消费线程唤醒了一批正在 p_cond_
条件变量下进行等待的生产线程,因为重新被唤醒的线程需要重新去竞争锁,所以被唤醒的一批生产线程之间处于一种互斥关系,在前面的消费线程释放锁之后,只有一个生产线程能够抢到锁,剩下被唤醒的生产线程此时就只能在锁那里等待,不能在 p_cond_
下继续等待,因为他们已经被唤醒了,唤醒就相当于 pthread_cond_wait(&c_cond_, &mutex_);
调用结束返回,该函数是在 if
下的,函数返回后,那些被唤醒却没抢到锁的生产线程此时就只能干巴巴的等着了,不能继续向后执行,只有抢到锁的那个线程可以往后执行。当这个申请锁成功的线程生产完数据,此时队列里数据又满了,会去唤醒一个消费者线程,然后该线程释放锁,此时,被唤醒的线程不止有前一秒才被唤醒的消费线程,还有之前被唤醒但是没抢到锁的那一批生产者线程,此时就会出现,生产线程和消费线程在同时竞争锁,如果锁被一个生产线程抢到了,那么它会从 pthread_cond_wait(&c_cond_,
&mutex_);
继续向后执行,也就是继续向队列里生产数据,但是此时队列已经满了,再向队列里生产数据就会出问题。所以,这里应该将 if
换成 while
,一个被唤醒的进程在抢到锁之后,不应该直接进行队列操作(不只是上面说的生产会出问题,消费也可能会出同样的问题,总之都是队列操作),而是再进行一次判断,看资源是否就绪,如果就绪再往后进行队列操作,没就绪就继续去调用 pthread_cond_wait(&c_cond_, &mutex_);
将自己休眠。
2.3 基于任务的多生产多消费模型
// task.hpp
#include <iostream>
#include <string>
enum
{
DIVERROR = 1,
MODERROR,
UNKNOWERRROR
};
class Task
{
public:
Task(int a, int b, char op)
:data1_(a), data2_(b), op_(op), result_(0), exitcode_(0)
{}
void run()
{
switch(op_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
if(data2_ == 0) exitcode_ = DIVERROR;
else result_ = data1_ / data2_;
break;
case '%':
if(data2_ == 0) exitcode_ = MODERROR;
else result_ = data1_ % data2_;
break;
default:
exitcode_ = UNKNOWERRROR;
break;
}
}
std::string result_to_string()
{
std::string ret = std::to_string(data1_);
ret += ' ';
ret += op_;
ret += ' ';
ret += std::to_string(data2_);
ret += ' ';
ret += '=';
ret += ' ';
ret += std::to_string(result_);
ret += "[exitcode: ";
ret += std::to_string(exitcode_);
ret += ']';
return ret;
}
std::string get_task()
{
std::string ret = std::to_string(data1_);
ret += ' ';
ret += op_;
ret += ' ';
ret += std::to_string(data2_);
ret += ' ';
ret += '=';
ret += ' ';
ret += '?';
return ret;
}
private:
int data1_;
int data2_;
char op_;
int result_;
int exitcode_;
};
// BlockQueue.hpp
#pragma
#include <iostream>
#include <pthread.h>
#include <queue>
template <class T>
class BlockQueue
{
static const int defaultmaximum = 20;
public:
BlockQueue(int maximum = defaultmaximum)
: maximum_(maximum)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_cond_, nullptr);
pthread_cond_init(&p_cond_, nullptr);
low_water_ = maximum_ / 3; // 低水位线是队列最大容量的 1/3
high_water_ = (maximum_*2)/3; // 高水位线是队列最大容量的 2/3
}
T pop()
{
pthread_mutex_lock(&mutex_);
while (q_.size() == 0)
{ // 消费条件不满足,去等待
pthread_cond_wait(&c_cond_, &mutex_);
}
// 1. 队列没空 2. 被唤醒
T out = q_.front();
q_.pop();
if (q_.size() <= low_water_)
{
pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
pthread_cond_wait(&c_cond_, &mutex_);
}
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &data)
{
pthread_mutex_lock(&mutex_);
while (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
{
// 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
pthread_cond_wait(&p_cond_, &mutex_);
}
// 1、队列没满 2、被唤醒
q_.push(data); // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,
if (q_.size() >= high_water_)
{
int ret = pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
pthread_cond_wait(&p_cond_, &mutex_);
}
pthread_mutex_unlock(&mutex_);
// pthread_cond_wait(&p_cond_, &mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源
int maximum_; // 队列的最大容量
pthread_mutex_t mutex_; // 定义一个互斥量(锁)
pthread_cond_t c_cond_; // 定义一个消费者条件变量,消费者在这个条件变量下进行等待
pthread_cond_t p_cond_; // 定义一个生产者条件变量,生产者在这个条件变量下进行等待
int low_water_; // 队列的低水位线
int high_water_; // 队列的高水位线
};
//main.cc
#include "BlockQueue.hpp"
#include <unistd.h>
#include "Task.h"
const std::string opers = "+-*/%";
// 消费者
void *Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 消费——从队列里面拿数据
Task task = bq->pop();
// 模拟数据处理的过程
task.run();
std::cout << pthread_self() << "# 处理任务: " << task.get_task().c_str() << ", 运算结果是: " << task.result_to_string().c_str() << std::endl;
usleep(1000000);
}
}
// 生产者
void *Productor(void *args)
{
int len = opers.size();
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
int data = 1;
while (true)
{
// 模拟生产者生产数据
int data1 = rand() % 10 + 1; // [1, 10]
usleep(10);
int data2 = rand() % 13; // [0, 13]
usleep(10);
char op = opers[rand() % len];
Task task(data1, data2, op);
// 生产——往队列里面放数据
bq->push(task);
std::cout << pthread_self() << "@ 生产了一个任务: " << task.get_task().c_str() << std::endl;
// data++;
usleep(1000000);
}
}
int main()
{
srand((unsigned int)time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, Consumer, bq);
}
for (int i = 0; i < 5; i++)
{
pthread_create(p+i, nullptr, Productor, bq);
}
// 主线程等待两个子线程退出
for(int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for(int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
delete bq; // 释放资源
return 0;
}
标签:mutex,修行,队列,Linux,cond,pthread,线程,唤醒
From: https://blog.csdn.net/m0_68662723/article/details/141995926