COMS W4995 C++ Deep Dive for C Programmers

Concurrency

In this chapter, we introduce the fundamentals of concurrent programming. We’ll start by writing a simple multi-threaded program in C-style and then rewrite it using modern C++ library facilities.

Using std::chrono to measure performance

We present the 18/pthread program which measures how long it takes to count to two million. We will iterate on the implementation throughout this chapter. The initial version below presents sum1(), which makes two calls to f1(), one after another, counting a million each time.

struct Args {
    uint64_t* sum_ptr;
    uint64_t count;
};

void* f1(void* args) {
    Args* p = (Args*)args;
    uint64_t* sum_ptr = p->sum_ptr;
    uint64_t count = p->count;

    while (count--) {
        ++*sum_ptr;
    }
    return NULL;
}

uint64_t sum1() {
    constexpr uint64_t count = 1'000'000;
    uint64_t sum = 0;

    Args args1 { &sum, count };
    f1(&args1);

    Args args2 { &sum, count };
    f1(&args2);

    return sum;
}

...

int main() {
    auto time0 = high_resolution_clock::now();

    uint64_t result = sum1();
    // uint64_t result = sum2();
    // uint64_t result = sum3();
    // uint64_t result = sum4();
    // uint64_t result = sum5();

    auto time1 = high_resolution_clock::now();
    auto dt = duration_cast<microseconds>(time1 - time0);
    cout << "elapsed: " << dt.count() << " microsec\n";
    cout << "result: " << result << '\n';
}

The program produces the following output on our system:

elapsed: 4248 microsec
result: 2000000

f1() is written to take a single void* parameter to conform to the C pthread API which we will use in the next section. sum1() uses the Args structure to wrap a pointer to a shared sum variable to increment and a count of how many times to increment it, and passes a pointer to the Args structure to f1().

The main() function uses std::chrono::high_resolution_clock to take timestamps before and after calling sum1(). The timestamps are std::chrono::time_point objects that we can substract to retrieve an std::chrono::duration that represents the elapsed time between the two timestamps. We use std::chrono::duration_cast to convert that duration into microseconds.

C pthread API

We now replace the call to sum1() with sum2() in the main() function. The sum2() function creates two threads, each of which will run f1() in parallel:

uint64_t sum2() {
    constexpr uint64_t count = 1'000'000;
    uint64_t sum = 0;
    pthread_t t1, t2;

    Args args1 { &sum, count };
    pthread_create(&t1, NULL, &f1, &args1);

    Args args2 { &sum, count };
    pthread_create(&t2, NULL, &f1, &args2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    return sum;
}

The pthread_create() function takes a pointer to a pthread_t variable to return a thread ID, an optional pointer to a thread attributes structure, a function pointer for the thread to execute, and a pointer to pass to the function as an argument. After creating the two threads, the main thread – i.e., the thread executing sum2() – calls pthread_join() to wait for the two threads to terminate. These two functions are analogous to creating a new process with fork() and waiting for it to terminate using waitpid().

Recall that fork() creates a new process with its own virtual memory address space, distinct from the address space of its parent process. A thread created by pthread_create() is similar to a process in that it runs in parallel with the thread that created it, but the difference is that the two threads share a virtual memory address space. However, each thread does get its own stack because they execute independently and therefore have their own local variables and function call stack.

We compiled and linked the program as follows:

g++ -Wall -g -std=c++20 -pthread   -c -o pthread.o pthread.cpp
g++ -pthread  pthread.o   -o pthread

We specify the -pthread flag to use the POSIX threads library. Note that we use the -pthread flag for both compiling and linking. It defines required macros when compiling and brings in the POSIX threads library when linking.

The program produces the following output:

elapsed: 6753 microsec
result: 1167685

The program produces an incorrect result – i.e., the count falls short of two million. This is expected because there is no synchronization between the two threads incrementing the shared sum variable. Each thread executes the expression ++*sum_ptr in parallel, but it is not an atomic operation. The compiled machine code will break it down into the following three steps:

  1. Load *sum_ptr from memory into a register
  2. Increment the value in the register
  3. Store the register’s value to *sum_ptr

These sequence of steps can be interleaved between the two threads. For example, let’s say *sum_ptr currently holds 6. Thread A loads the value 6 into a register, increments it to 7, and is about to write it back to memory. At that time, thread B loads the value of *sum_ptr, which is still 6, increments it to 7, and successfully writes it back to memory. Thread A also writes 7 to memory. Even though two increments were performed, the end result is that the value only went from 6 to 7. This explained the undercounting we observed above.

An astute reader may have noticed that the program running two threads in parallel somehow took longer to run than the previous version where we performed the counting serially. We’ll address this in the next section.

Cache-conscious programming

When two threads running on two CPUs keep updating the same memory location, they keep invalidating each other’s cache for that memory location, forcing the CPUs to keep reloading the memory into cache, therefore severely degrading the performance. This is why the parallel execution of f1() by two threads was slower than the previous serial execution.

In an attempt to fix this problem, sum3() declares an array of two integers for each thread to increment independently:

uint64_t sum3() {
    constexpr uint64_t count = 1'000'000;
    uint64_t sum[2] = {0};
    pthread_t t1, t2;

    Args args1 { &sum[0], count };
    pthread_create(&t1, NULL, &f1, &args1);

    Args args2 { &sum[1], count };
    pthread_create(&t2, NULL, &f1, &args2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    return sum[0] + sum[1];
}

Here is the output:

elapsed: 6778 microsec
result: 2000000

The computed result is correct now, but the program still takes roughly the same amount of time to run!

This program demonstrates a phenomenon known as false sharing. Caches are grouped into chunks of bytes called cache blocks or cache lines. If one byte in a line is invalidated, the whole line gets invalidated. So if two separate memory locations being accessed by two threads are close enough to be in a single cache line, the program would behave as if the two threads are accessing the same memory location.

We can remedy false sharing by ensuring the two memory locations aren’t on the same cache line. The cache line size on our system, which has x86 CPUs, is 64 bytes. The sum4() function, shown below, declares an uint64_t array of size 9 and uses the first and last elements to count the sum. These two elements are 64 bytes away from each other, so they will land on separate cache lines.

uint64_t sum4() {
    constexpr uint64_t count = 1'000'000;
    uint64_t sum[9] = {0};
    pthread_t t1, t2;

    Args args1 { &sum[0], count };
    pthread_create(&t1, NULL, &f1, &args1);

    Args args2 { &sum[8], count };
    pthread_create(&t2, NULL, &f1, &args2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    return sum[0] + sum[8];
}

The program output is shown below:

elapsed: 2293 microsec
result: 2000000

We now see the full benefit of parallel execution being realized – it runs almost twice as fast as the initial serial version!

Mutual Exclusion

Let’s now go back to the single sum variable implementation and introduce thread synchronization to obtain the correct result, as shown below:

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

void* f2(void* args) {
    Args* p = (Args*)args;
    uint64_t* sum_ptr = p->sum_ptr;
    uint64_t count = p->count;

    while (count--) {
        pthread_mutex_lock(&mtx);
        ++*sum_ptr;
        pthread_mutex_unlock(&mtx);
    }
    return NULL;
}

uint64_t sum5() {
    constexpr uint64_t count = 1'000'000;
    uint64_t sum = 0;
    pthread_t t1, t2;

    Args args1 { &sum, count };
    pthread_create(&t1, NULL, &f2, &args1);

    Args args2 { &sum, count };
    pthread_create(&t2, NULL, &f2, &args2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    return sum;
}

The sum5() function calls f2() instead of f1(), which are the same except that f2() calls pthread_mutex_lock() and pthread_mutex_unlock() around ++*sum_ptr. pthread_mutex_lock() takes a mutex object and attempts to take ownership of it, or “lock” it. If the mutex is already being held by another thread, the calling thread blocks until the other thread calls pthread_mutex_unlock() to release the mutex, at which point the calling thread obtains the mutex and returns from pthread_mutex_lock(). We declared the shared mutex object mtx as a global variable so that it’s accessible to both threads.

Both threads use pthread_mutex_lock() before entering the critical section (a region of code they wish to run atomically) and pthread_mutex_unlock() after exiting the critical section. This guarantees mutual exclusion of the threads in the critical section – i.e., at most one thread can execute the code in the critical section at a time.

The program output is shown below:

elapsed: 147613 microsec
result: 2000000

As we can see, synchronizing the two threads using a mutex fixed the undercounting issue, but the program’s performace significantly degraded compared to the non-synchronized version – it’s about 20 times slower. Generally speaking, synchronization mechanisms come at a high cost. Therefore, it is preferrable to partition the program into independently computable pieces to avoid synchronization if possible.

More to come