COMS W4995 C++ for C Programmers

Index of 2025-1/code/15

Parent directory
Makefile
cond-var.cpp
producer-consumer.cpp

Makefile

CC  = g++
CXX = g++

CFLAGS   = -Wall -g
CXXFLAGS = -Wall -g -std=c++17 -pthread

LDFLAGS  = -pthread
LDLIBS   =

executables = cond-var producer-consumer

.PHONY: default
default: $(executables)

cond-var: cond-var.o

producer-consumer: producer-consumer.o

.PHONY: clean
clean:
	rm -f a.out core *.o $(executables)

.PHONY: all
all: clean default

cond-var.cpp

#include <iostream>
#include <sstream>
#include <algorithm>
#include <cstdlib>
#include <cassert>
#include <random>
#include <thread>
#include <deque>
#include <mutex>
#include <condition_variable>
using namespace std;

int main() {
    vector<string> vec;

    mutex mtx;
    condition_variable got_something;

    thread t {
        [&]() {
            while (1) {
                unique_lock lck(mtx);  // lck's contructor will lock mtx

                while (vec.empty()) {
                    got_something.wait(lck);  // wait will release lck
                }
                // lck is reacquired when wait() returns

                for (auto&& x : vec) { cout << x << ' '; }
                cout << '\n';
                vec.clear();

                // lck's destructor will unlock mtx
            }
        }
    };

    string str, line;
    while (getline(cin, line)) {
        istringstream iss(line);

        unique_lock lck(mtx);  // lck's contructor will lock mtx

        while (iss >> str) { vec.push_back(str); }

        got_something.notify_one();  // unblock one waiting thread

        // lck's destructor will unlock mtx
    }
}

producer-consumer.cpp

#include <iostream>
#include <algorithm>
#include <cstdlib>
#include <cassert>
#include <random>
#include <thread>
#include <mutex>
#include <deque>
#include <condition_variable>
using namespace std;
using namespace std::chrono;

template <typename T, size_t N>
struct bounded_buffer {

    deque<T> dq;
    mutex mtx;
    condition_variable has_element;
    condition_variable has_space;

    template <typename U>
    void enqueue(U&& x) {

        unique_lock lck(mtx);

        while (dq.size() == N) {
            has_space.wait(lck);  // wait() will release lck
        }
        // lck is reacquired when wait() returns

        dq.push_front(std::forward<U>(x));
        has_element.notify_one();

        log_enq();

        // lck's destructor will unlock mtx
    }

    T dequeue() {

        unique_lock lck(mtx);

        while (dq.size() == 0) {
            has_element.wait(lck);
        }

        T x = std::move(dq.back());
        dq.pop_back();
        has_space.notify_one();

        log_deq(x);

        return x;
    }

    void log_enq() {
        assert(1 <= dq.size() && dq.size() <= N);
        cout << "(" << this_thread::get_id() << ") " << dq[0] << " -->\t[ ";
        for (size_t i = 0; i < dq.size(); ++i) { cout << dq[i] << ' '; }
        for (size_t i = dq.size(); i < N; ++i) { cout << ". "; }
        cout << "]\n";
    }

    void log_deq(const T& x) {
        assert(0 <= dq.size() && dq.size() <= N - 1);
        cout << "\t\t\t[ ";
        for (size_t i = 0; i < dq.size(); ++i) { cout << dq[i] << ' '; }
        for (size_t i = dq.size(); i < N; ++i) { cout << ". "; }
        cout << "] --> " << x << " (" << this_thread::get_id() << ")\n";
    }
};

int main() {
    constexpr size_t buf_size = 8;
    constexpr size_t num_producers = 5;
    constexpr size_t num_consumers = 5;
    constexpr size_t producer_max_delay = 1000; // max delay in milliseconds
    constexpr size_t consumer_max_delay = 1000;

    random_device::result_type seed = time(nullptr);
    mt19937 engine { seed };
    bounded_buffer<string,buf_size> bb;

    auto producer = [&](string str) {
        uniform_int_distribution<> distro { 1, producer_max_delay };
        while (1) {
            this_thread::sleep_for(milliseconds(distro(engine)));
            bb.enqueue(str);
        }
    };

    for (size_t i = 0; i < num_producers; ++i) {
        char c = 'A' + i;
        thread t { producer, string{c} };
        // detach the thread of execution from the thread object
        // so that the destructor will not terminate the program
        t.detach();
    }

    auto consumer = [&]() {
        uniform_int_distribution<> distro { 1, consumer_max_delay };
        while (1) {
            this_thread::sleep_for(milliseconds(distro(engine)));
            bb.dequeue();
        }
    };

    for (size_t i = 0; i < num_consumers; ++i) {
        thread t { consumer };
        t.detach();
    }

    cin.get(); // pause the main thread until ENTER is pressed
}