COMS W4995 C++ for C Programmers

Index of 2025-1/code/14

Parent directory
Makefile
concur0.cpp
concur1.cpp
pthread0.cpp
pthread1.cpp

Makefile

CC  = g++
CXX = g++

CFLAGS   = -Wall -g
CXXFLAGS = -Wall -g -std=c++17 -pthread

LDFLAGS  = -pthread
LDLIBS   =

executables = concur0 pthread0 pthread1 concur1

.PHONY: default
default: $(executables)

concur0: concur0.o

pthread0: pthread0.o

pthread1: pthread1.o

concur1: concur1.o

.PHONY: clean
clean:
	rm -f a.out core *.o $(executables)

.PHONY: all
all: clean default

concur0.cpp

#include <new>
#include <bitset>
#include <vector>
#include <iostream>
#include <algorithm>
#include <iomanip>
#include <locale>
#include <mutex>
#include <atomic>
#include <future>

namespace c2cpp {
    constexpr unsigned long long operator""_bil(unsigned long long x) {
        return x * 1'000'000'000ull;
    }
}

using namespace c2cpp;
using namespace std;
using namespace std::chrono;

void f1(uint64_t& sum, uint64_t count) {
    while (count--) {
        ++sum;
    }
}

int main() {
    uint64_t sum = 0;

    auto time0 = high_resolution_clock::now();

    f1(sum, 1_bil);
    f1(sum, 1_bil);

    auto time1 = high_resolution_clock::now();
    auto dt = duration_cast<milliseconds>(time1 - time0);
    cout << dt.count() / 1'000.0 << " sec\n";
    cout << sum << '\n';
}

/*  Lecture outline:

1.  Serial execution (and some odds & ends for setup)

    - Code walk-thru

    - std:chrono
    
    - Literal operators
    
    - Subtle differences between unsigned long long, unsigned long, uint64_t

        - (signed or unsigned) long long: 64-bit in most systems
        - (signed or unsigned) long:      64-bit in Linux & macOS, but
                                          32-bit in Windows
        - uint64_t: typedef'd to be unsigned long in Linux

        - unsigned long long is the only integer type allowed for literal
          operators

2.  Creating multiple threads of execution using std::thread

    - Code change:

        //f1(sum, 1_bil);
        //f1(sum, 1_bil);

        thread t1 { f1, ref(sum), 1_bil };
        thread t2 { f1, ref(sum), 1_bil };

        t1.join();
        t2.join();

    - Two invocations of f1() run at the same time on two different CPUs

        - Use htop command to monitor CPU usage
        - Use taskset command to assign specific CPUs to a process

        - Why incorrect result?
            - See diagram here:
              https://cs4157.github.io/www/2025-1/lect/07-thread.html

        - Why slower than serial execution? - We'll revisit this later

3.  Creating multiple threads using pthread_create()

    - pthread0.cpp code walk-thru

    - Processes vs. threads:
        - Processes DO NOT share virtual memory address space
        - Threads DO share virtual memory address space, but 
          each thread has its own stack

4.  Synchronizing multiple threads using mutex lock

    - Compare pthread0.cpp with pthread1.cpp to see how to use mutex

    - Mutex lock incurs significant overhead

5.  Building multi-threaded applications

    - Use -pthread flag for both compilation and linking

        - Defines required macros when compiling
        - Brings in the POSIX threads library when linking

6.  False sharing

    - Why was parallel execuations of f1() slower than serial execuations?

        - When two threads running on two CPUs keep updating the same memory
          location, they keep invalidating each other's cache for that 
          memory location, severely degrading the performance.

    - Try the following code changes:

        //uint64_t sum = 0;
        uint64_t sum[2] = {0};

        ...

        //thread t1 { f1, ref(sum), 1_bil };
        //thread t2 { f1, ref(sum), 1_bil };
        thread t1 { f1, ref(sum[0]), 1_bil };
        thread t2 { f1, ref(sum[1]), 1_bil };

        ...

        //cout << sum << '\n';
        cout << sum[0] << '\n';
        cout << sum[1] << '\n';

      Why is it still slow?!

    - Now try the following:

        //uint64_t sum = 0;
        uint64_t sum[9] = {0};

        ...

        //thread t1 { f1, ref(sum), 1_bil };
        //thread t2 { f1, ref(sum), 1_bil };
        thread t1 { f1, ref(sum[0]), 1_bil };
        thread t2 { f1, ref(sum[8]), 1_bil };

        ...

        //cout << sum << '\n';
        cout << sum[0] << '\n';
        cout << sum[8] << '\n';

    - Cache is grouped into a chunk of bytes called cache block or cache
      line.  If one byte in that chunk is invalidated, the whole chuck
      gets invalidated.  So if the two separate variables are close enough
      to be in a single cache line, the program would behave as if the two
      threads are accessing a single variable. This is called False
      sharing.

      The cache line size of the x86 CPUs on CLAC is 64 bytes.

7.  std::ref

    - Returns std::reference_wrapper, which is copyable & assignable
    - Lets you pass references to std::thread or std::bind
    - Lets you store references in standard containers

*/

concur1.cpp

#include <new>
#include <bitset>
#include <vector>
#include <iostream>
#include <algorithm>
#include <iomanip>
#include <locale>
#include <mutex>
#include <atomic>
#include <future>

namespace c2cpp {
    constexpr unsigned long long operator""_bil(unsigned long long x) {
        return x * 1'000'000'000ull;
    }
}

using namespace c2cpp;
using namespace std;
using namespace std::chrono;

// void f1(uint64_t& sum, uint64_t count) { while (count--) { ++sum; } }

struct wallet {
    atomic<unsigned long long> money = 0;

    operator unsigned long long() const { return money; }

    wallet& operator++() {
        ++money;
        return *this;
    }
};

int main() {
    static_assert(atomic<unsigned long long>::is_always_lock_free);

    wallet sum;
    auto f2 = [&](uint64_t count) { while (count--) ++sum; };

    auto time0 = high_resolution_clock::now();

    thread t1 { f2, 1_bil };
    thread t2 { f2, 1_bil };

    t1.join();
    t2.join();

    auto time1 = high_resolution_clock::now();
    auto dt = duration_cast<milliseconds>(time1 - time0);
    cout << dt.count() / 1'000.0 << " sec\n";
    cout << sum << '\n';
}

/*  Lecture outline:

1.  Atomic types

    - Code walk-thru

        - atomic<T>
        - atomic<T>::is_always_lock_free
        - x86 implementation: "lock" prefix

2.  std::mutex

    - Code change in struct wallet:

        //atomic<unsigned long long> money = 0;
        mutex m;
        unsigned long long money = 0;

    - Code change in wallet::operator++():

        m.lock();
        ++money;
        m.unlock();

    - This is a straght translation of pthread_mutex_lock/unlock() in C

3.  std::scoped_lock

    - We can and should do better in C++! Use std::scoped_lock mutex
      ownership wrapper to make locking exception-safe:

        {
            scoped_lock lck{m};
            ++money;
        }

4.  STL provides 3 different mutex ownership wrappers:

    scoped_lock

        - Wrapper for zero or more mutexes
            - note that scoped_lock can be default-contructed without mutex
        
        - Uses deadlock avoidance algorithm so it is safe to pass multiple
          mutexes in any order
            - when locking multiple mutexes, they are required to support
              non-blocking try_lock() function

    lock_guard

        - Older version that wraps only one mutex - no default contructor

    unique_lock

        - Also for only one mutex, but has more features

        - In partucular, std::condition_variable works only with 
          std::unique_lock<std::mutex>

5.  Reader-writer lock

    std::shared_mutex provides two levels of locking:
    
        - lock_shared(): multiple threads can lock it in shared/reader mode
        - lock(): only one thread can lock it in exclusive/writer mode

        - When there is a writer, there can be no other reader or writer
        - When there is one or more reader, there can be no writer

    Wrappers for shared_mutex:

        - Use shared_lock wrapper to lock shared_mutex in reader mode
        - Use unique_lock wrapper to lock shared_mutex in writer mode

6.  Future

    std::future provides a mechanism to retrieve a return value (or an
    exception) from an asynchronous task (i.e., a function running in a
    different thread)

        - An asynchronous task, created by std::promise, std::packaged_task,
          or std::async, provides a future object to the creator

        - The creator retrieves the result from the future object,
          possibly blocking until the asynchronous task sends the result.

    Code rewrite using std::async:

        int main() {
            auto f3 = [](uint64_t count) {
                uint64_t sum = 0;
                while (count--) ++sum;
                return sum;
            };

            auto time0 = high_resolution_clock::now();

            future<uint64_t> future1 = async(f3, 1_bil);
            future<uint64_t> future2 = async(f3, 1_bil);
            cout << "async tasks started\n";
            uint64_t sum = future1.get() + future2.get();

            auto time1 = high_resolution_clock::now();
            auto dt = duration_cast<milliseconds>(time1 - time0);
            cout << dt.count() / 1'000.0 << " sec\n";
            cout << sum << '\n';
        }

*/

pthread0.cpp

#include <iostream>
#include <chrono>
#include <pthread.h>
using namespace std;
using namespace std::chrono;

struct Args {
    uint64_t *sum_ptr;
    uint64_t count;
};

void* f1(void *args) {
    Args *p = (Args *)args;
    uint64_t *sum_ptr = p->sum_ptr;
    uint64_t count = p->count;

    while (count--) {
        ++*sum_ptr;
    }
    return NULL;
}

int main() {
    uint64_t sum = 0;
    constexpr uint64_t count = 1000 * 1000 * 1000;
    pthread_t t1, t2;

    auto time0 = high_resolution_clock::now();

    Args args1 { &sum, count };
    pthread_create(&t1, NULL, &f1, &args1);

    Args args2 { &sum, count };
    pthread_create(&t2, NULL, &f1, &args2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    auto time1 = high_resolution_clock::now();
    auto dt = duration_cast<milliseconds>(time1 - time0);
    cout << dt.count() / 1'000.0 << " sec\n";
    cout << sum << '\n';
}

pthread1.cpp

#include <iostream>
#include <chrono>
#include <pthread.h>
using namespace std;
using namespace std::chrono;

struct Args {
    uint64_t *sum_ptr;
    uint64_t count;
};

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

void* f1(void *args) {
    Args *p = (Args *)args;
    uint64_t *sum_ptr = p->sum_ptr;
    uint64_t count = p->count;

    while (count--) {
        pthread_mutex_lock(&mtx);
        ++*sum_ptr;
        pthread_mutex_unlock(&mtx);
    }
    return NULL;
}

int main() {
    uint64_t sum = 0;
    constexpr uint64_t count = 1000 * 1000 * 1000;
    pthread_t t1, t2;

    auto time0 = high_resolution_clock::now();

    Args args1 { &sum, count };
    pthread_create(&t1, NULL, &f1, &args1);

    Args args2 { &sum, count };
    pthread_create(&t2, NULL, &f1, &args2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    auto time1 = high_resolution_clock::now();
    auto dt = duration_cast<milliseconds>(time1 - time0);
    cout << dt.count() / 1'000.0 << " sec\n";
    cout << sum << '\n';
}