生产者与消费者模型-有界缓冲区


 问题描述

   生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。  该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据 放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

 代码要求

 

  • 三个线程,两个缓冲区 第一个线程往缓冲区a中put,第二个线程从缓冲区a中get,然后put到缓冲区b中;第三个线程从缓冲区b中get。第一个线程相当于纯生产者,第三个线程相当于纯消费者,第二个线程相当于既是生产者又是消费者。
  • 用C++类封装信号量相关的API函数,实现一个名为Semaphore的类,提供两个成员 函数:p()和v(); 

    Semaphore s(8);

    s.p();

    s.v();

  • 用两个锁分别保护head和tail

 实现临界区互斥访问的方法之一  信号量法

  概念上信号量是表示无力资源数量的实体,它是一个与队列有关的整型变量,实现上,信号量是一种记录型数据结构,有两个分量,一个是信号量的值,一个是等待该信号量的进程队列的头指针。

 实验代码

 

  1 #include 
  2 #include 
  3 
  4 
  5 using namespace std;
  6 
  7 
  8 class Semaphore {
  9 private:
 10     HANDLE SSemaphore;
 11 public:
 12     Semaphore(int m, int n) {        
 13         SSemaphore = CreateSemaphore(NULL, m, n, NULL);         //创建信号量
 14     }
 15     ~Semaphore() {                                               //销毁信号量
 16         CloseHandle(SSemaphore);
 17     }
 18     void P() {                                                     //P操作 
 19         WaitForSingleObject(SSemaphore, INFINITE);
 20     }
 21         
 22     void V() {                                                    //V操作 
 23         ReleaseSemaphore(SSemaphore, 1, NULL);
 24     }
 25 
 26 };
 27 
 28 
 29 class Buffer {
 30     static const int SIZE = 100;
 31     private:
 32         int cells[SIZE];
 33         int tail;
 34         int head;
 35         int num;
 36 
 37         Semaphore semaphore_full_cell;        //空格子
 38         Semaphore semaphore_empty_cell;        //满格子
 39         Semaphore mutex;                    
 40         Semaphore mutex_tail;//保护尾部信号量
 41     public:
 42         Buffer()
 43         : num(0), head(0), tail(0), semaphore_full_cell(0, SIZE),
 44             semaphore_empty_cell(SIZE, SIZE), mutex(1, 1), mutex_tail(1, 1){}
 45         ~Buffer(){}
 46 
 47         bool put(int x)
 48         {
 49             semaphore_empty_cell.P();
 50             mutex_tail.P();
 51             if (num == SIZE) {
 52             return false;
 53         }
 54             cells[tail] = x;
 55             tail = (tail + 1) % SIZE;
 56             num++;
 57             mutex_tail.V();
 58             semaphore_full_cell.V();
 59             return true;
 60         }
 61 
 62         bool get(int& x)
 63         {
 64             semaphore_full_cell.P();
 65             mutex.P();
 66             if (num == 0) {
 67                 return false;
 68             }
 69             x = cells[head];
 70             head = (head + 1) % SIZE;
 71             num--;
 72             mutex.V();
 73             semaphore_empty_cell.V();
 74             return true;
 75         }
 76 };
 77 
 78 
 79 Buffer a;        //a缓存区
 80 Buffer b;        //b缓存区
 81 
 82 
 83 DWORD WINAPI producer(LPVOID)            //生产者线程
 84 {
 85     for (int i = 0; i < 200; i++) {
 86         bool ok = a.put(i);
 87         if (!ok) {
 88             cout << GetCurrentThreadId() << " put: " << i << endl;
 89         }
 90     }
 91     return 0;
 92 }
 93 
 94 
 95 DWORD WINAPI consumer(LPVOID)            //消费者线程
 96 {
 97     for (int i = 0; i < 200; i++) {
 98         int x;
 99         bool ok = b.get(x);
100         if (!ok) {
101             cout << GetCurrentThreadId() << " get: " << endl;
102         }
103     }
104     return 0;
105 }
106 
107 
108 DWORD WINAPI midder(LPVOID)                //即是生产者也是消费者线程
109 {
110     for (int i = 0; i < 200; i++) {
111         int x;
112         bool ok1 = a.get(x);
113         if (!ok1) {
114             cout << GetCurrentThreadId() << " get: " << endl;
115         }
116         bool ok = b.put(i);
117         if (!ok) {
118             cout << GetCurrentThreadId() << " put: " << i << endl;
119         }
120 
121     }
122     return 0;
123 }
124 
125 
126 int main()
127 {
128     HANDLE thread1 = CreateThread(NULL, 0, producer, 0, 0, NULL);
129     HANDLE thread2 = CreateThread(NULL, 0, midder, 0, 0, NULL);
130     HANDLE thread3 = CreateThread(NULL, 0, consumer, 0, 0, NULL);
131 
132     WaitForSingleObject(thread1, INFINITE);
133     WaitForSingleObject(thread2, INFINITE);
134     WaitForSingleObject(thread3, INFINITE);
135 
136     return 0;
137 }