 Parent directory
Parent directory
 Makefile
Makefile
 cond-var.cpp
cond-var.cpp
 producer-consumer.cpp
producer-consumer.cpp
CC  = g++
CXX = g++
CFLAGS   = -Wall -g
CXXFLAGS = -Wall -g -std=c++17 -pthread
LDFLAGS  = -pthread
LDLIBS   =
executables = cond-var producer-consumer
.PHONY: default
default: $(executables)
cond-var: cond-var.o
producer-consumer: producer-consumer.o
.PHONY: clean
clean:
	rm -f a.out core *.o $(executables)
.PHONY: all
all: clean default#include <iostream>
#include <sstream>
#include <algorithm>
#include <cstdlib>
#include <cassert>
#include <random>
#include <thread>
#include <deque>
#include <mutex>
#include <condition_variable>
using namespace std;
int main() {
    vector<string> vec;
    mutex mtx;
    condition_variable got_something;
    thread t {
        [&]() {
            while (1) {
                unique_lock lck(mtx);  // lck's contructor will lock mtx
                while (vec.empty()) {
                    got_something.wait(lck);  // wait will release lck
                }
                // lck is reacquired when wait() returns
                for (auto&& x : vec) { cout << x << ' '; }
                cout << '\n';
                vec.clear();
                // lck's destructor will unlock mtx
            }
        }
    };
    string str, line;
    while (getline(cin, line)) {
        istringstream iss(line);
        unique_lock lck(mtx);  // lck's contructor will lock mtx
        while (iss >> str) { vec.push_back(str); }
        got_something.notify_one();  // unblock one waiting thread
        // lck's destructor will unlock mtx
    }
}#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cassert>
#include <random>
#include <thread>
#include <mutex>
#include <deque>
#include <condition_variable>
using namespace std;
using namespace std::chrono;
template <typename T, size_t N>
struct bounded_buffer {
    deque<T> dq;
    mutex mtx;
    condition_variable has_element;
    condition_variable has_space;
    template <typename U>
    void enqueue(U&& x) {
        unique_lock lck(mtx);
        while (dq.size() == N) {
            has_space.wait(lck);  // wait() will release lck
        }
        // lck is reacquired when wait() returns
        dq.push_front(std::forward<U>(x));
        has_element.notify_one();
        log_enq();
        // lck's destructor will unlock mtx
    }
    T dequeue() {
        unique_lock lck(mtx);
        while (dq.size() == 0) {
            has_element.wait(lck);
        }
        T x = std::move(dq.back());
        dq.pop_back();
        has_space.notify_one();
        log_deq(x);
        return x;
    }
    void log_enq() {
        assert(1 <= dq.size() && dq.size() <= N);
        cout << "(" << this_thread::get_id() << ") " << dq[0] << " -->\t[ ";
        for (size_t i = 0; i < dq.size(); ++i) { cout << dq[i] << ' '; }
        for (size_t i = dq.size(); i < N; ++i) { cout << ". "; }
        cout << "]\n";
    }
    void log_deq(const T& x) {
        assert(0 <= dq.size() && dq.size() <= N - 1);
        cout << "\t\t\t[ ";
        for (size_t i = 0; i < dq.size(); ++i) { cout << dq[i] << ' '; }
        for (size_t i = dq.size(); i < N; ++i) { cout << ". "; }
        cout << "] --> " << x << " (" << this_thread::get_id() << ")\n";
    }
};
int main() {
    constexpr size_t buf_size = 8;
    constexpr size_t num_producers = 5;
    constexpr size_t num_consumers = 5;
    constexpr size_t producer_max_delay = 1000; // max delay in milliseconds
    constexpr size_t consumer_max_delay = 1000;
    random_device::result_type seed = time(nullptr);
    mt19937 engine { seed };
    bounded_buffer<string,buf_size> bb;
    auto producer = [&](string str) {
        uniform_int_distribution<> distro { 1, producer_max_delay };
        while (1) {
            this_thread::sleep_for(milliseconds(distro(engine)));
            bb.enqueue(str);
        }
    };
    for (size_t i = 0; i < num_producers; ++i) {
        char c = 'A' + i;
        thread t { producer, string{c} };
        // detach the thread of execution from the thread object
        // so that the destructor will not terminate the program
        t.detach();
    }
    auto consumer = [&]() {
        uniform_int_distribution<> distro { 1, consumer_max_delay };
        while (1) {
            this_thread::sleep_for(milliseconds(distro(engine)));
            bb.dequeue();
        }
    };
    for (size_t i = 0; i < num_consumers; ++i) {
        thread t { consumer };
        t.detach();
    }
    cin.get(); // pause the main thread until ENTER is pressed
}