forked from selfboot/CS_Offer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ProducerConsumer.cpp
104 lines (92 loc) · 2.72 KB
/
ProducerConsumer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* @Author: [email protected]
* @Last Modified time: 2016-09-10 11:12:19
* Worked on Mac OS X
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <dispatch/dispatch.h> // OS X
const int BufLen = 10; // 缓冲区长度
int buf[BufLen] = {0};
int head = 0, tail = 0; // 缓存区,首尾指针
int product_NO = 1; // 产品序号
const int MProducer=4, NConsumer=1; // 生产者, 消费者数目
dispatch_semaphore_t empty = dispatch_semaphore_create(BufLen);
dispatch_semaphore_t full = dispatch_semaphore_create(0);
dispatch_semaphore_t mutex = dispatch_semaphore_create(1);
// 将产品放入缓冲区中
void insert_item(int item){
printf("+++ Create a product: %d \t", item);
buf[tail] = item;
tail = (tail+1) % BufLen;
}
// 从缓冲区中取出产品
void remove_item(){
printf("--- Delete a product: %d \t", buf[head]);
buf[head] = 0; // 消费
head = (head+1) % BufLen;
}
void print_items(){
int i=head;
printf("Current Buf looks like: ");
int count = 0;
while (buf[i]!=0){
printf("%d, ", buf[i]);
// 缓冲区满,跳出循环
if(++count == BufLen){
break;
}
i = (i+1)%BufLen;
}
printf("\n");
}
void *producerFunc(void *arg) // 生产者线程
{
int item;
while(true)
{
dispatch_semaphore_wait(empty, DISPATCH_TIME_FOREVER);
dispatch_semaphore_wait(mutex, DISPATCH_TIME_FOREVER);
item = product_NO++;
insert_item(item);
print_items();
dispatch_semaphore_signal(mutex);
dispatch_semaphore_signal(full);
sleep(3);
}
}
void *consumerFunc(void * arg) // 消费者线程
{
while(true)
{
dispatch_semaphore_wait(full, DISPATCH_TIME_FOREVER);
dispatch_semaphore_wait(mutex, DISPATCH_TIME_FOREVER);
remove_item();
print_items();
dispatch_semaphore_signal(mutex);
dispatch_semaphore_signal(empty);
sleep(5);
}
}
int main()
{
pthread_t producer[MProducer];
for (int i = 0; i < MProducer; ++i)
if (pthread_create(&producer[i], NULL, producerFunc, NULL) != 0)
{
printf("Creating producer %d error.\n", i);
exit(1);
}
pthread_t consumer[NConsumer];
for (int i = 0; i < NConsumer; ++i)
if (pthread_create(&consumer[i], NULL, consumerFunc, NULL) != 0)
{
printf("Creating consumer %d error.\n", i);
exit(1);
}
for (int i = 0; i < MProducer; ++i) pthread_join(producer[i], NULL);
for (int i = 0; i < NConsumer; ++i) pthread_join(consumer[i], NULL);
return 0;
}