1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
// beetje rommelig om <stdlib.h> te gebruiken inplaats van <cstdlib> in c++ :(
// verder worden ze ook helemaal niet gebruikt dus ik weet niet waarom het hier staat
// #include <cstdlib>
// #include <cstdio>
// #include <ctime>
#include <iostream>
#include <random>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
const int TICK = 1000;
const int NBOXES = 10;
const int BUFLEN = 4;
// https://stackoverflow.com/questions/21237905/how-do-i-generate-thread-safe-uniform-random-numbers
int random(int n) {
static thread_local std::mt19937 generator;
std::uniform_int_distribution<int> distribution(0, n);
return distribution(generator);
}
class Box {
private:
const char *name;
int number;
public:
Box(const char *name, int number) {
this->name = name;
this->number = number;
}
friend ostream &operator << (ostream &out, Box *b) {
return out << b->name << "-" << b->number;
}
};
class Queue {
private:
Box *buffer[BUFLEN] = { nullptr };
int getpos = 0, putpos = 0;
int count = 0;
mutex _busy;
// deze mutex zorgt ervoor dat alle variabelen binnen de critical section
// (`buffer`, `getpos`, `putpos`, `count`, `_can_get`, en `_can_put`)
// allemaal door maar één thread tegelijkertijd aangepast kunnen worden,
// waardoor deze klasse een thread-safe queue implementeert
condition_variable _can_get;
condition_variable _can_put;
public:
Queue() { }
// initialisatiewaarden van private variabelen bij de declaratie neergezet,
// de std library mutex en unique_lock hebben geen expliciete initialisatie
// nodig
Box *get(const char *consumername) {
unique_lock<mutex> lock(_busy);
// deze mutex komt overeen met de windows api code die aan het begin van
// een functie EnterCriticalSection doet, en aan het einde
// LeaveCriticalSection (maar dit gebeurt impliciet omdat de variabele
// `lock` buiten de scope valt bij de return van deze functie)
while (count == 0) // wacht tot er minstens een doos is om te pakken
_can_get.wait(lock); // wacht tot een melding op _can_get (vermijd CPU kachel)
// de bovenstaande while loop met wait gedoe is gelijk aan WaitForSingleObject
Box *box = buffer[getpos];
getpos = (getpos + 1) % BUFLEN;
count--;
cout << consumername << ": gets " << box << endl;
_can_put.notify_one(); // meld aan threads die wachten op _can_put dat
// count anders is geworden
return box;
}
// dezelfde structuur wordt gebruikt voor thread safety als de bovenstaande
// functie, maar dan met _can_put en _can_get omgedraaid
void put(const char *producername, Box *box) {
unique_lock<mutex> lock(_busy);
while (count == BUFLEN) _can_put.wait(lock);
cout << producername << ": puts " << box << endl;
buffer[putpos] = box;
putpos = (putpos + 1) % BUFLEN;
count++;
_can_get.notify_one();
}
};
Queue q;
// thread functie hoeft verder niks te returnen (waarom DWORD WINAPI ??)
void produce(void *arg) {
char *name = (char*) arg;
for(int i = 0; i < NBOXES; i++) {
// c++ stdlib sleep :tada:
std::this_thread::sleep_for(std::chrono::milliseconds(random(TICK)));
Box *box = new Box(name, i);
q.put(name, box);
}
}
void consume(void *arg) {
char *name = (char*) arg;
for(int i = 0; i < NBOXES; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(random(TICK)));
Box *box = q.get(name);
delete box;
}
}
int main(int argc, char* argv[]) {
// c++ stdlib threads :tada:
vector<thread> pool;
pool.push_back(thread(produce, (void*)"P1"));
pool.push_back(thread(produce, (void*)"P2"));
pool.push_back(thread(produce, (void*)"P3"));
pool.push_back(thread(produce, (void*)"P4"));
pool.push_back(thread(consume, (void*)"C1"));
pool.push_back(thread(consume, (void*)"C2"));
pool.push_back(thread(consume, (void*)"C3"));
pool.push_back(thread(consume, (void*)"C4"));
for (thread& t : pool) t.join();
return 0;
}
|