Parent directory Makefile c2cpp_util.h concur.cpp
Download
CC = g++ CXX = g++ CFLAGS = -Wall -g CXXFLAGS = -Wall -g -std=c++17 -pthread LDFLAGS = -pthread LDLIBS = executables = concur .PHONY: default default: $(executables) concur: concur.o .PHONY: clean clean: rm -f a.out core *.o $(executables) .PHONY: all all: clean default
#include <cxxabi.h> #include <typeinfo> #include <cassert> #include <cstdlib> #ifdef __GNUC__ // use abi::__cxa_demangle to demangle type name in g++ & clang++ #define PRINT_TYPE(os, x) do { \ int status; \ char *s = abi::__cxa_demangle(typeid(x).name(), 0, 0, &status); \ assert(status == 0); \ os << s << '\n'; \ std::free(s); \ } while (0) #else // MSVC returns human-readable type name, according to cppreference.com #define PRINT_TYPE(os, x) do { \ os << typeid(x).name() << '\n'; \ } while (0) #endif
#include <new> #include <bitset> #include <vector> #include <iostream> #include <algorithm> #include <iomanip> #include <locale> #include <mutex> #include <atomic> #include <future> #include "c2cpp_util.h" namespace c2cpp { constexpr unsigned long long operator""_bil(unsigned long long x) { return x * 1'000'000'000ull; } template<typename T> std::string fmt_usd(T&& x) { std::ostringstream oss; oss.imbue(std::locale("en_US.UTF8")); oss << std::fixed << std::setprecision(2) << x; return "$" + oss.str(); } } using namespace c2cpp; using namespace std; using namespace std::chrono; void f1(uint64_t& sum, uint64_t count) { while (count--) ++sum; } int main() { uint64_t sum = 0; auto time0 = high_resolution_clock::now(); f1(sum, 1_bil); f1(sum, 1_bil); auto time1 = high_resolution_clock::now(); auto dt = duration_cast<milliseconds>(time1 - time0); cout << dt.count() / 1'000.0 << "sec\n"; cout << fmt_usd(sum) << '\n'; } /* Lecture outline: 1. Serial execution (and some odds & ends for setup) - uint64_t, unsigned long, unsigned long long - literal operator - locale - std::chrono 2. thread code change: //f1(sum, 1_bil); //f1(sum, 1_bil); thread t1 { f1, ref(sum), 1_bil }; thread t2 { f1, ref(sum), 1_bil }; t1.join(); t2.join(); 3. False sharing code change 1: //uint64_t sum = 0; uint64_t sum[100] = {0}; constexpr int cache_line_size = 64; cout << "cache_line_size == " << cache_line_size << '\n'; uint64_t sum_addr = (uint64_t)sum; cout << "&sum[0] == " << bitset<cache_line_size>{sum_addr} << '\n'; cout << "&sum[0] is " << (sum_addr % cache_line_size ? "NOT " : "") << "cache line aligned" << '\n'; code change 2: //thread t1 { f1, ref(sum), 1_bil }; //thread t2 { f1, ref(sum), 1_bil }; thread t1 { f1, ref(sum[0]), 1_bil }; thread t2 { f1, ref(sum[7]), 1_bil }; code change 3: //cout << fmt_usd(sum) << '\n'; cout << fmt_usd(sum[0]) << '\n'; cout << fmt_usd(sum[7]) << '\n'; Explanation: When two threads running on two CPUs keep updating the same memory location, they keep invalidating each other's cache for that memory location, severely degrading the performance. This makes sense. But why does the same thing happen when the threads update their own separate variables? That's because cache is grouped into a chunk of bytes called cache block or cache line. If one byte in that chunk is invalidated, the whole chuck gets invalidated. So if the two separate variables are close enough to be in a single cache line, the program would behave as if the two threads are accessing a single variable. This is called False sharing. The cache line size of the x86 CPUs on CLAC seems to be 64 bytes. 4. linking multithreaded application From gcc documentation: When you link a multithreaded application, you will probably need to add a library or flag to g++. This is a very non-standardized area of GCC across ports. Some ports support a special flag (the spelling isn't even standardized yet) to add all required macros to a compilation (if any such flags are required then you must provide the flag for all compilations not just linking) and link-library additions and/or replacements at link time. The documentation is weak. On several targets (including GNU/Linux, Solaris and various BSDs) -pthread is honored. Some other ports use other switches. This is not well documented anywhere other than in "gcc -dumpspecs" (look at the 'lib' and 'cpp' entries). 5. std::ref - returns std::reference_wrapper, which is copyable & assignable - lets you pass references to std::thread or std::bind - lets you store references in standard containers 6. atomic types code change 1: //void f1(uint64_t& sum, uint64_t count) { while (count--) ++sum; } struct wallet { atomic<unsigned long long> money = 0; operator unsigned long long() const { return money; } wallet& operator++() { ++money; return *this; } }; code change 2: //uint64_t sum = 0; static_assert(atomic<unsigned long long>::is_always_lock_free); wallet sum; auto f2 = [&](uint64_t count) { while (count--) ++sum; }; code change 3: //thread t1 { f1, ref(sum), 1_bil }; //thread t2 { f1, ref(sum), 1_bil }; thread t1 { f2, 1_bil }; thread t2 { f2, 1_bil }; 7. mutex code change 1 (in struct wallet): //atomic<unsigned long long> money = 0; mutex m; unsigned long long money = 0; code change 2 (in wallet::operator++): m.lock(); ++money; m.unlock(); 8. scoped_lock m.lock() & m.unlock() is NOT exception-safe! Use scoped_lock wrapper like this: { scoped_lock lck{m}; ++money; } 9. scoped_lock vs. lock_guard vs. unique_lock scoped_lock - wrapper for one or more mutexes - uses deadlock avoidance algorithm so it is safe to concurrently lock any set of mutexes in any order - in order to lock multiple mutexes, the mutex types must meet the Lockable requirements (in addition to BasicLockable), which means they need to support try_lock() in addition to lock() & unlock() lock_guard - older version that only works with one mutex unique_lock - also for only one mutex, but has more features - condition_variable requires the use of unique_lock 10. shared_mutex - reader-writer mutex - shared_lock wrapper to read-lock - unique_lock wrapper to write-lock 11. future, promise, packaged_task, async future & promise - thread 1: future = promise.get_future(); ... future.get(); - thread 2: promise.set_value(something); packaged_task - promise + functor async - promise + thread 12. concur.cpp using async code chabge: int main() { auto f3 = [](uint64_t count) { uint64_t sum = 0; while (count--) ++sum; return sum; }; auto time0 = high_resolution_clock::now(); future<uint64_t> future1 = async(f3, 1_bil); future<uint64_t> future2 = async(f3, 1_bil); cout << "async tasks started\n"; uint64_t sum = future1.get() + future2.get(); auto time1 = high_resolution_clock::now(); */