生产者消费者模型

什么是生产者消费者模型

生产者-消费者模型($Producer-consumer problem$)是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。
523ef8e8b8be4722b1ba88c49bbd671d.png

这个模型一般由两类线程和一个缓冲区组成来组成:

1、生产者线程:生产数据,并把数据放入缓冲区中。
2、缓冲区:存放生产者生产的数据的地方。
3、消费者线程:从缓冲区里取数据,消费数据。

运行流程:

1、生产者和消费者在同一时间段内共用同一个存储空间
2、生产者往存储空间中添加产品
3、消费者从存储空间中取走产品
4、当存储空间为空时,消费者阻塞等待;当存储空间满时,生产者阻塞等待。

为什么要用生产者消费者模型

在多线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中:

1、如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
2、如果消费者的处理能力大于生产者,那么消费者就必须等待生产者生产数据,才能继续消费数据。

为了解决这个问题于是引入了生产者消费者模式

代码实现

版本一:使用互斥锁和条件变量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// 本程序演示用互斥锁和条件变量实现生产者消费者模型。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <vector>

using namespace std;

// 缓存队列消息的结构体。
struct st_message
{
int mesgid; // 消息的id。
char message[1024]; // 消息的内容。
}stmesg;

vector<struct st_message> vcache; // 用vector容器做缓存。

pthread_cond_t cond=PTHREAD_COND_INITIALIZER; // 声明并初始化条件变量。
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; // 声明并初始化互斥锁。

void incache(int sig); // 生产者、数据入队。
void *outcache(void *arg); // 消费者、数据出队线程的主函数。

int main()
{
// pthread_mutex_init(&mutex, NULL);
// pthread_cond_init(&cond, NULL);
signal(15,incache); // 接收15的信号,调用生产者函数。

// 创建三个消费者线程。
pthread_t thid1,thid2,thid3;
pthread_create(&thid1,NULL,outcache,NULL);
pthread_create(&thid2,NULL,outcache,NULL);
pthread_create(&thid3,NULL,outcache,NULL);

pthread_join(thid1,NULL);
pthread_join(thid2,NULL);
pthread_join(thid3,NULL);

pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);

return 0;
}

void incache(int sig) // 生产者、数据入队。
{
static int mesgid=1; // 消息的计数器。

struct st_message stmesg; // 消息内容。
memset(&stmesg,0,sizeof(struct st_message));

pthread_mutex_lock(&mutex); // 给缓存队列加锁。

// 生产数据,放入缓存队列。
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);

pthread_mutex_unlock(&mutex); // 给缓存队列解锁。

//pthread_cond_signal(&cond); // 发送条件信号,激活一个线程。
pthread_cond_broadcast(&cond); // 发送条件信号,激活全部的线程。
}

void thcleanup(void *arg)
{
// 在这里释放关闭文件、断开网络连接、回滚数据库事务、释放锁等等。
printf("cleanup ok.\n");

// 线程清理函数中需要释放互斥锁!!!!!!!
pthread_mutex_unlock(&mutex);

/*
在pthread_cond_wait时执行pthread_cancel后,
要先在线程清理函数中要先解锁已与相应条件变量绑定的mutex,
这样是为了保证pthread_cond_wait可以返回到调用线程。
*/
};

void *outcache(void *arg) // 消费者、数据出队线程的主函数。
{
pthread_cleanup_push(thcleanup,NULL); // 把线程清理函数入栈。

struct st_message stmesg; // 用于存放出队的消息。

while (true)
{
pthread_mutex_lock(&mutex); // 给缓存队列加锁。

// 如果缓存队列为空,等待,用while防止条件变量虚假唤醒。
while (vcache.size()==0)
{
pthread_cond_wait(&cond,&mutex);
}

// 从缓存队列中获取第一条记录,然后删除该记录。
memcpy(&stmesg,&vcache[0],sizeof(struct st_message)); // 内存拷贝。
vcache.erase(vcache.begin());

pthread_mutex_unlock(&mutex); // 给缓存队列解锁。

// 以下是处理业务的代码。
printf("phid=%ld,mesgid=%d\n",pthread_self(),stmesg.mesgid);
usleep(100);
}

pthread_cleanup_pop(1); // 把线程清理函数出栈。
}

注意

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 使用 while 而不用 if 是为了防止线程被虚假唤醒。
// 线程被唤醒时,缓冲区里不一定有数据!!
while (vcache.size()==0)
{
pthread_cond_wait(&cond,&mutex);
}

// 调用 pthread_cond_wait(&cond, &mutex);时,系统会先对mutex解锁。
// 当cond被唤醒时,系统会同时对mutex上锁,这一步是原子操作。

// 在线程清理函数中:
void thcleanup(void *arg)
{
// 在这里释放关闭文件、断开网络连接、回滚数据库事务、释放锁等等。
printf("cleanup ok.\n");

// 线程清理函数中需要释放互斥锁!!!!!!!
pthread_mutex_unlock(&mutex);

// 在pthread_cond_wait时执行pthread_cancel后,
// 要先在线程清理函数中要先解锁已与相应条件变量绑定的mutex,
// 这样是为了保证pthread_cond_wait可以返回到调用线程。
// 否则mutex会一直被锁死
};

版本二:使用信号量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// 本程序演示只用信号量实现高速缓存。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <semaphore.h>
#include <vector>

using namespace std;

// 缓存队列消息的结构体。
struct st_message
{
int mesgid; // 消息的id。
char message[1024]; // 消息的内容。
}stmesg;

vector<struct st_message> vcache; // 用vector容器做缓存。

// pthread_cond_t cond=PTHREAD_COND_INITIALIZER; // 声明并初始化条件变量。
// pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; // 声明并初始化互斥锁。
sem_t notify,lock; // 声明信号量。

void incache(int sig); // 生产者、数据入队。
void *outcache(void *arg); // 消费者、数据出队线程的主函数。

int main()
{
signal(15,incache); // 接收15的信号,调用生产者函数。

sem_init(&notify,0,0); // 初始化通知的信号量,第3个参数为0。
sem_init(&lock,0,1); // 初始化加锁的信号量,第3个参数为1。

// 创建三个消费者线程。
pthread_t thid1,thid2,thid3;
pthread_create(&thid1,NULL,outcache,NULL);
pthread_create(&thid2,NULL,outcache,NULL);
pthread_create(&thid3,NULL,outcache,NULL);

pthread_join(thid1,NULL);
pthread_join(thid2,NULL);
pthread_join(thid3,NULL);

sem_destroy(&notify);
sem_destroy(&lock);

return 0;
}

void incache(int sig) // 生产者、数据入队。
{
static int mesgid=1; // 消息的计数器。

struct st_message stmesg; // 消息内容。
memset(&stmesg,0,sizeof(struct st_message));

// pthread_mutex_lock(&mutex); // 给缓存队列加锁。
sem_wait(&lock);

// 生产数据,放入缓存队列。
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
// stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
// stmesg.mesgid=mesgid++; vcache.push_back(stmesg);

// pthread_mutex_unlock(&mutex); // 给缓存队列解锁。
sem_post(&lock);

// pthread_cond_signal(&cond); // 发送条件信号,激活一个线程。
// pthread_cond_broadcast(&cond); // 发送条件信号,激活全部的线程。

sem_post(&notify); // 把信号量的值加1,将唤醒消费者线程。
sem_post(&notify); // 把信号量的值加1,将唤醒消费者线程。
sem_post(&notify); // 把信号量的值加1,将唤醒消费者线程。
sem_post(&notify); // 把信号量的值加1,将唤醒消费者线程。
}

void *outcache(void *arg) // 消费者、数据出队线程的主函数。
{
struct st_message stmesg; // 用于存放出队的消息。

while (true)
{
sem_wait(&lock); // 给缓存队列加锁。

// 如果缓存队列为空,等待,用while防止虚假唤醒。
while (vcache.size()==0)
{
// pthread_cond_wait(&cond,&mutex);

sem_post(&lock); // 给缓存队列解锁。
sem_wait(&notify); // 等待信号量的值大于0。
sem_wait(&lock); // 给缓存队列加锁。
}

// 从缓存队列中获取第一条记录,然后删除该记录。
memcpy(&stmesg,&vcache[0],sizeof(struct st_message)); // 内存拷贝。
vcache.erase(vcache.begin());

sem_post(&lock); // 给缓存队列解锁。

// 以下是处理业务的代码。
printf("phid=%ld,mesgid=%d\n",pthread_self(),stmesg.mesgid);
usleep(100);
}
}