In this chapter, we introduce the fundamentals of concurrent programming. We’ll start by writing a simple multi-threaded program in C-style and then rewrite it using modern C++ library facilities.
std::chrono to measure performanceWe present the 18/pthread program which measures how long it takes to count to
two million. We will iterate on the implementation throughout this chapter. The
initial version below presents sum1(), which makes two calls to f1(), one
after another, counting a million each time.
struct Args {
uint64_t* sum_ptr;
uint64_t count;
};
void* f1(void* args) {
Args* p = (Args*)args;
uint64_t* sum_ptr = p->sum_ptr;
uint64_t count = p->count;
while (count--) {
++*sum_ptr;
}
return NULL;
}
uint64_t sum1() {
constexpr uint64_t count = 1'000'000;
uint64_t sum = 0;
Args args1 { &sum, count };
f1(&args1);
Args args2 { &sum, count };
f1(&args2);
return sum;
}
...
int main() {
auto time0 = high_resolution_clock::now();
uint64_t result = sum1();
// uint64_t result = sum2();
// uint64_t result = sum3();
// uint64_t result = sum4();
// uint64_t result = sum5();
auto time1 = high_resolution_clock::now();
auto dt = duration_cast<microseconds>(time1 - time0);
cout << "elapsed: " << dt.count() << " microsec\n";
cout << "result: " << result << '\n';
}
The program produces the following output on our system:
elapsed: 4248 microsec
result: 2000000
f1() is written to take a single void* parameter to conform to the C
pthread API which we will use in the next section. sum1() uses the Args
structure to wrap a pointer to a shared sum variable to increment and a
count of how many times to increment it, and passes a pointer to the Args
structure to f1().
The main() function uses std::chrono::high_resolution_clock to take
timestamps before and after calling sum1(). The timestamps are
std::chrono::time_point objects that we can substract to retrieve an
std::chrono::duration that represents the elapsed time between the two
timestamps. We use std::chrono::duration_cast to convert that duration into
microseconds.
pthread APIWe now replace the call to sum1() with sum2() in the main() function. The
sum2() function creates two threads, each of which will run f1() in
parallel:
uint64_t sum2() {
constexpr uint64_t count = 1'000'000;
uint64_t sum = 0;
pthread_t t1, t2;
Args args1 { &sum, count };
pthread_create(&t1, NULL, &f1, &args1);
Args args2 { &sum, count };
pthread_create(&t2, NULL, &f1, &args2);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return sum;
}
The pthread_create() function takes a pointer to a pthread_t variable to
return a thread ID, an optional pointer to a thread attributes structure, a
function pointer for the thread to execute, and a pointer to pass to the
function as an argument. After creating the two threads, the main thread –
i.e., the thread executing sum2() – calls pthread_join() to wait for the
two threads to terminate. These two functions are analogous to creating a new
process with fork() and waiting for it to terminate using waitpid().
Recall that fork() creates a new process with its own virtual memory address
space, distinct from the address space of its parent process. A thread created
by pthread_create() is similar to a process in that it runs in parallel with
the thread that created it, but the difference is that the two threads share a
virtual memory address space. However, each thread does get its own stack
because they execute independently and therefore have their own local variables
and function call stack.
We compiled and linked the program as follows:
g++ -Wall -g -std=c++20 -pthread -c -o pthread.o pthread.cpp
g++ -pthread pthread.o -o pthread
We specify the -pthread flag to use the POSIX threads library. Note that we
use the -pthread flag for both compiling and linking. It defines required
macros when compiling and brings in the POSIX threads library when linking.
The program produces the following output:
elapsed: 6753 microsec
result: 1167685
The program produces an incorrect result – i.e., the count falls short of two
million. This is expected because there is no synchronization between the two
threads incrementing the shared sum variable. Each thread executes the
expression ++*sum_ptr in parallel, but it is not an atomic operation. The
compiled machine code will break it down into the following three steps:
*sum_ptr from memory into a register*sum_ptrThese sequence of steps can be interleaved between the two threads. For example,
let’s say *sum_ptr currently holds 6. Thread A loads the value 6 into a
register, increments it to 7, and is about to write it back to memory. At that
time, thread B loads the value of *sum_ptr, which is still 6, increments it to
7, and successfully writes it back to memory. Thread A also writes 7 to memory.
Even though two increments were performed, the end result is that the value only
went from 6 to 7. This explained the undercounting we observed above.
An astute reader may have noticed that the program running two threads in parallel somehow took longer to run than the previous version where we performed the counting serially. We’ll address this in the next section.
When two threads running on two CPUs keep updating the same memory location,
they keep invalidating each other’s cache for that memory location, forcing the
CPUs to keep reloading the memory into cache, therefore severely degrading the
performance. This is why the parallel execution of f1() by two threads was
slower than the previous serial execution.
In an attempt to fix this problem, sum3() declares an array of two integers
for each thread to increment independently:
uint64_t sum3() {
constexpr uint64_t count = 1'000'000;
uint64_t sum[2] = {0};
pthread_t t1, t2;
Args args1 { &sum[0], count };
pthread_create(&t1, NULL, &f1, &args1);
Args args2 { &sum[1], count };
pthread_create(&t2, NULL, &f1, &args2);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return sum[0] + sum[1];
}
Here is the output:
elapsed: 6778 microsec
result: 2000000
The computed result is correct now, but the program still takes roughly the same amount of time to run!
This program demonstrates a phenomenon known as false sharing. Caches are grouped into chunks of bytes called cache blocks or cache lines. If one byte in a line is invalidated, the whole line gets invalidated. So if two separate memory locations being accessed by two threads are close enough to be in a single cache line, the program would behave as if the two threads are accessing the same memory location.
We can remedy false sharing by ensuring the two memory locations aren’t on the
same cache line. The cache line size on our system, which has x86 CPUs, is 64
bytes. The sum4() function, shown below, declares an uint64_t array of size
9 and uses the first and last elements to count the sum. These two elements are
64 bytes away from each other, so they will land on separate cache lines.
uint64_t sum4() {
constexpr uint64_t count = 1'000'000;
uint64_t sum[9] = {0};
pthread_t t1, t2;
Args args1 { &sum[0], count };
pthread_create(&t1, NULL, &f1, &args1);
Args args2 { &sum[8], count };
pthread_create(&t2, NULL, &f1, &args2);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return sum[0] + sum[8];
}
The program output is shown below:
elapsed: 2293 microsec
result: 2000000
We now see the full benefit of parallel execution being realized – it runs almost twice as fast as the initial serial version!
Let’s now go back to the single sum variable implementation and introduce
thread synchronization to obtain the correct result, as shown below:
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
void* f2(void* args) {
Args* p = (Args*)args;
uint64_t* sum_ptr = p->sum_ptr;
uint64_t count = p->count;
while (count--) {
pthread_mutex_lock(&mtx);
++*sum_ptr;
pthread_mutex_unlock(&mtx);
}
return NULL;
}
uint64_t sum5() {
constexpr uint64_t count = 1'000'000;
uint64_t sum = 0;
pthread_t t1, t2;
Args args1 { &sum, count };
pthread_create(&t1, NULL, &f2, &args1);
Args args2 { &sum, count };
pthread_create(&t2, NULL, &f2, &args2);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return sum;
}
The sum5() function calls f2() instead of f1(), which are the same except
that f2() calls pthread_mutex_lock() and pthread_mutex_unlock() around
++*sum_ptr. pthread_mutex_lock() takes a mutex object and
attempts to take ownership of it, or “lock” it. If the mutex is already being
held by another thread, the calling thread blocks until the other thread calls
pthread_mutex_unlock() to release the mutex, at which point the calling thread
obtains the mutex and returns from pthread_mutex_lock(). We declared the
shared mutex object mtx as a global variable so that it’s accessible to both
threads.
Both threads use pthread_mutex_lock() before entering the critical section
(a region of code they wish to run atomically) and pthread_mutex_unlock()
after exiting the critical section. This guarantees mutual exclusion of the
threads in the critical section – i.e., at most one thread can execute the code
in the critical section at a time.
The program output is shown below:
elapsed: 147613 microsec
result: 2000000
As we can see, synchronizing the two threads using a mutex fixed the undercounting issue, but the program’s performace significantly degraded compared to the non-synchronized version – it’s about 20 times slower. Generally speaking, synchronization mechanisms come at a high cost. Therefore, it is preferrable to partition the program into independently computable pieces to avoid synchronization if possible.