Parent directory
Makefile
cond-var.cpp
producer-consumer.cpp
CC = g++
CXX = g++
CFLAGS = -Wall -g
CXXFLAGS = -Wall -g -std=c++17 -pthread
LDFLAGS = -pthread
LDLIBS =
executables = cond-var producer-consumer
.PHONY: default
default: $(executables)
cond-var: cond-var.o
producer-consumer: producer-consumer.o
.PHONY: clean
clean:
rm -f a.out core *.o $(executables)
.PHONY: all
all: clean default
#include <iostream>
#include <sstream>
#include <algorithm>
#include <cstdlib>
#include <cassert>
#include <random>
#include <thread>
#include <deque>
#include <mutex>
#include <condition_variable>
using namespace std;
int main() {
vector<string> vec;
mutex mtx;
condition_variable got_something;
thread t {
[&]() {
while (1) {
unique_lock lck(mtx); // lck's contructor will lock mtx
while (vec.empty()) {
got_something.wait(lck); // wait will release lck
}
// lck is reacquired when wait() returns
for (auto&& x : vec) { cout << x << ' '; }
cout << '\n';
vec.clear();
// lck's destructor will unlock mtx
}
}
};
string str, line;
while (getline(cin, line)) {
istringstream iss(line);
unique_lock lck(mtx); // lck's contructor will lock mtx
while (iss >> str) { vec.push_back(str); }
got_something.notify_one(); // unblock one waiting thread
// lck's destructor will unlock mtx
}
}
#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cassert>
#include <random>
#include <thread>
#include <mutex>
#include <deque>
#include <condition_variable>
using namespace std;
using namespace std::chrono;
template <typename T, size_t N>
struct bounded_buffer {
deque<T> dq;
mutex mtx;
condition_variable has_element;
condition_variable has_space;
template <typename U>
void enqueue(U&& x) {
unique_lock lck(mtx);
while (dq.size() == N) {
has_space.wait(lck); // wait() will release lck
}
// lck is reacquired when wait() returns
dq.push_front(std::forward<U>(x));
has_element.notify_one();
log_enq();
// lck's destructor will unlock mtx
}
T dequeue() {
unique_lock lck(mtx);
while (dq.size() == 0) {
has_element.wait(lck);
}
T x = std::move(dq.back());
dq.pop_back();
has_space.notify_one();
log_deq(x);
return x;
}
void log_enq() {
assert(1 <= dq.size() && dq.size() <= N);
cout << "(" << this_thread::get_id() << ") " << dq[0] << " -->\t[ ";
for (size_t i = 0; i < dq.size(); ++i) { cout << dq[i] << ' '; }
for (size_t i = dq.size(); i < N; ++i) { cout << ". "; }
cout << "]\n";
}
void log_deq(const T& x) {
assert(0 <= dq.size() && dq.size() <= N - 1);
cout << "\t\t\t[ ";
for (size_t i = 0; i < dq.size(); ++i) { cout << dq[i] << ' '; }
for (size_t i = dq.size(); i < N; ++i) { cout << ". "; }
cout << "] --> " << x << " (" << this_thread::get_id() << ")\n";
}
};
int main() {
constexpr size_t buf_size = 8;
constexpr size_t num_producers = 5;
constexpr size_t num_consumers = 5;
constexpr size_t producer_max_delay = 1000; // max delay in milliseconds
constexpr size_t consumer_max_delay = 1000;
random_device::result_type seed = time(nullptr);
mt19937 engine { seed };
bounded_buffer<string,buf_size> bb;
auto producer = [&](string str) {
uniform_int_distribution<> distro { 1, producer_max_delay };
while (1) {
this_thread::sleep_for(milliseconds(distro(engine)));
bb.enqueue(str);
}
};
for (size_t i = 0; i < num_producers; ++i) {
char c = 'A' + i;
thread t { producer, string{c} };
// detach the thread of execution from the thread object
// so that the destructor will not terminate the program
t.detach();
}
auto consumer = [&]() {
uniform_int_distribution<> distro { 1, consumer_max_delay };
while (1) {
this_thread::sleep_for(milliseconds(distro(engine)));
bb.dequeue();
}
};
for (size_t i = 0; i < num_consumers; ++i) {
thread t { consumer };
t.detach();
}
cin.get(); // pause the main thread until ENTER is pressed
}