Parent directory
Makefile
c2cpp_util.h
concur.cpp
CC = g++
CXX = g++
CFLAGS = -Wall -g
CXXFLAGS = -Wall -g -std=c++17 -pthread
LDFLAGS = -pthread
LDLIBS =
executables = concur
.PHONY: default
default: $(executables)
concur: concur.o
.PHONY: clean
clean:
rm -f a.out core *.o $(executables)
.PHONY: all
all: clean default
#include <cxxabi.h>
#include <typeinfo>
#include <cassert>
#include <cstdlib>
#ifdef __GNUC__
// use abi::__cxa_demangle to demangle type name in g++ & clang++
#define PRINT_TYPE(os, x) do { \
int status; \
char *s = abi::__cxa_demangle(typeid(x).name(), 0, 0, &status); \
assert(status == 0); \
os << s << '\n'; \
std::free(s); \
} while (0)
#else
// MSVC returns human-readable type name, according to cppreference.com
#define PRINT_TYPE(os, x) do { \
os << typeid(x).name() << '\n'; \
} while (0)
#endif
#include <new>
#include <bitset>
#include <vector>
#include <iostream>
#include <algorithm>
#include <iomanip>
#include <locale>
#include <mutex>
#include <atomic>
#include <future>
#include "c2cpp_util.h"
namespace c2cpp {
constexpr unsigned long long operator""_bil(unsigned long long x) {
return x * 1'000'000'000ull;
}
template<typename T> std::string fmt_usd(T&& x) {
std::ostringstream oss;
oss.imbue(std::locale("en_US.UTF8"));
oss << std::fixed << std::setprecision(2) << x;
return "$" + oss.str();
}
}
using namespace c2cpp;
using namespace std;
using namespace std::chrono;
void f1(uint64_t& sum, uint64_t count) { while (count--) ++sum; }
int main()
{
uint64_t sum = 0;
auto time0 = high_resolution_clock::now();
f1(sum, 1_bil);
f1(sum, 1_bil);
auto time1 = high_resolution_clock::now();
auto dt = duration_cast<milliseconds>(time1 - time0);
cout << dt.count() / 1'000.0 << "sec\n";
cout << fmt_usd(sum) << '\n';
}
/* Lecture outline:
1. Serial execution (and some odds & ends for setup)
- uint64_t, unsigned long, unsigned long long
- literal operator
- locale
- std::chrono
2. thread
code change:
//f1(sum, 1_bil);
//f1(sum, 1_bil);
thread t1 { f1, ref(sum), 1_bil };
thread t2 { f1, ref(sum), 1_bil };
t1.join();
t2.join();
3. False sharing
code change 1:
//uint64_t sum = 0;
uint64_t sum[100] = {0};
constexpr int cache_line_size = 64;
cout << "cache_line_size == " << cache_line_size << '\n';
uint64_t sum_addr = (uint64_t)sum;
cout << "&sum[0] == " << bitset<cache_line_size>{sum_addr} << '\n';
cout << "&sum[0] is " << (sum_addr % cache_line_size ? "NOT " : "")
<< "cache line aligned" << '\n';
code change 2:
//thread t1 { f1, ref(sum), 1_bil };
//thread t2 { f1, ref(sum), 1_bil };
thread t1 { f1, ref(sum[0]), 1_bil };
thread t2 { f1, ref(sum[7]), 1_bil };
code change 3:
//cout << fmt_usd(sum) << '\n';
cout << fmt_usd(sum[0]) << '\n';
cout << fmt_usd(sum[7]) << '\n';
Explanation:
When two threads running on two CPUs keep updating the same memory
location, they keep invalidating each other's cache for that memory
location, severely degrading the performance. This makes sense. But why
does the same thing happen when the threads update their own separate
variables? That's because cache is grouped into a chunk of bytes called
cache block or cache line. If one byte in that chunk is invalidated, the
whole chuck gets invalidated. So if the two separate variables are close
enough to be in a single cache line, the program would behave as if the two
threads are accessing a single variable. This is called False sharing.
The cache line size of the x86 CPUs on CLAC seems to be 64 bytes.
4. linking multithreaded application
From gcc documentation:
When you link a multithreaded application, you will probably need to add a
library or flag to g++. This is a very non-standardized area of GCC across
ports. Some ports support a special flag (the spelling isn't even
standardized yet) to add all required macros to a compilation (if any such
flags are required then you must provide the flag for all compilations not
just linking) and link-library additions and/or replacements at link time.
The documentation is weak. On several targets (including GNU/Linux, Solaris
and various BSDs) -pthread is honored. Some other ports use other switches.
This is not well documented anywhere other than in "gcc -dumpspecs" (look
at the 'lib' and 'cpp' entries).
5. std::ref
- returns std::reference_wrapper, which is copyable & assignable
- lets you pass references to std::thread or std::bind
- lets you store references in standard containers
6. atomic types
code change 1:
//void f1(uint64_t& sum, uint64_t count) { while (count--) ++sum; }
struct wallet {
atomic<unsigned long long> money = 0;
operator unsigned long long() const { return money; }
wallet& operator++() {
++money;
return *this;
}
};
code change 2:
//uint64_t sum = 0;
static_assert(atomic<unsigned long long>::is_always_lock_free);
wallet sum;
auto f2 = [&](uint64_t count) { while (count--) ++sum; };
code change 3:
//thread t1 { f1, ref(sum), 1_bil };
//thread t2 { f1, ref(sum), 1_bil };
thread t1 { f2, 1_bil };
thread t2 { f2, 1_bil };
7. mutex
code change 1 (in struct wallet):
//atomic<unsigned long long> money = 0;
mutex m;
unsigned long long money = 0;
code change 2 (in wallet::operator++):
m.lock();
++money;
m.unlock();
8. scoped_lock
m.lock() & m.unlock() is NOT exception-safe!
Use scoped_lock wrapper like this:
{
scoped_lock lck{m};
++money;
}
9. scoped_lock vs. lock_guard vs. unique_lock
scoped_lock
- wrapper for one or more mutexes
- uses deadlock avoidance algorithm so it is safe to concurrently
lock any set of mutexes in any order
- in order to lock multiple mutexes, the mutex types must meet the
Lockable requirements (in addition to BasicLockable), which means
they need to support try_lock() in addition to lock() & unlock()
lock_guard
- older version that only works with one mutex
unique_lock
- also for only one mutex, but has more features
- condition_variable requires the use of unique_lock
10. shared_mutex
- reader-writer mutex
- shared_lock wrapper to read-lock
- unique_lock wrapper to write-lock
11. future, promise, packaged_task, async
future & promise
- thread 1: future = promise.get_future(); ... future.get();
- thread 2: promise.set_value(something);
packaged_task
- promise + functor
async
- promise + thread
12. concur.cpp using async
code chabge:
int main()
{
auto f3 = [](uint64_t count) {
uint64_t sum = 0;
while (count--) ++sum;
return sum;
};
auto time0 = high_resolution_clock::now();
future<uint64_t> future1 = async(f3, 1_bil);
future<uint64_t> future2 = async(f3, 1_bil);
cout << "async tasks started\n";
uint64_t sum = future1.get() + future2.get();
auto time1 = high_resolution_clock::now();
*/