C++ Concurrency

1.Hello World

C++ Threads

To create a thread with cpp:

1#include <thread> 2 3void Foo(int a, int b); 4 5std::thread t1(Foo, 1, 2); 6std::thread t2([x, &y]() -> void { 7 // Lambda function, x value-captured, y reference-captured 8}); 9 10t1.join(); // Wait for thread to finish
1#include <thread> 2#include <iostream> 3 4// Static means local to this file 5static void threadBody1() { 6 std::cout << "Hello" << std::endl; 7} 8 9static void threadBody2() { 10 std::cout << "World" << std::endl; 11} 12 13int main() { 14 std::thread t1(threadBody1); 15 std::thread t2(threadBody2); 16 // We don't need to manually start the threads; they start upon creation 17 18 // We have to join the threads, otherwise the main thread may exit before they finish 19 t1.join(); 20 t2.join(); 21 // Joining threads locks the main thread until they finish 22 23 return 0; 24}

We see, World\nHello\n, Hello\nWorld\n or even HelloWorld\n\n.

1.2 Hello Vector

1#include <thread> 2#include <iostream> 3#include <vector> 4 5static void threadBody() { 6 std::cout << "Hello, my name is " << std::this_thread::get_id() << std::endl; 7} 8 9int main() { 10 std::vector<std::thread> threads; 11 for (size_t i = 0; i < 8; i++) { 12 threads.push_back(std::thread(threadBody)); 13 } 14 for (size_t i = 0; i < 8; i++) { 15 threads[i].join(); 16 } 17 return 0; 18}

The thread ID looks random, so instead, we can parameterize the thread function:

1#include <thread> 2#include <iostream> 3#include <vector> 4#include <string> 5#include <functional> 6 7// We pass names in by reference to avoid copying the vector for each thread 8static void threadBody(int id, std::vector<std::string>& names) { 9 std::cout 10 << "Hello, my name is " 11 << std::this_thread::get_id() 12 << " but you can call me " 13 << id 14 << " or " 15 << names[id] 16 << std::endl; 17} 18 19int main() { 20 std::vector<std::string> names = { 21 "Alice", "Bob", "Charlie", "David", 22 "Eve", "Frank", "Grace", "Heidi" 23 }; 24 std::vector<std::thread> threads; 25 for (size_t i = 0; i < 8; i++) { 26 // std::ref turns a value into a reference 27 threads.push_back(std::thread(threadBody, i, std::ref(names))); 28 } 29 for (size_t i = 0; i < 8; i++) { 30 threads[i].join(); 31 } 32 return 0; 33}

2. Threads and Locks

2.1 Mutex

We can use a mutex for synchronisation:

1#include <thread> 2#include <iostream> 3#include <vector> 4#include <string> 5#include <sstream> 6#include <functional> 7#include <mutex> 8 9class Logger { 10 public: 11 void logMessage(const std::string& message) { 12 mtx_.lock(); 13 log_.append(message); 14 mtx_.unlock(); 15 } 16 17 const std::string& getLog() const { 18 return log_; 19 } 20 private: 21 std::string log_; 22 std::mutex mtx_; 23}; 24 25static void threadBody(int id, std::vector<std::string>& names, Logger& logger) { 26 std::stringstream ss; 27 ss 28 << "Hello, my name is " 29 << std::this_thread::get_id() 30 << " but you can call me " 31 << id 32 << " or " 33 << names[id] 34 << std::endl; 35 logger.logMessage(ss.str()); 36} 37 38int main() { 39 std::vector<std::thread> threads; 40 std::vector<std::string> names = { 41 "Alice", "Bob", "Charlie", "David", 42 "Eve", "Frank", "Grace", "Heidi" 43 }; 44 Logger logger; 45 for (size_t i = 0; i < 8; i++) { 46 threads.push_back(std::thread(threadBody, i, std::ref(names), std::ref(logger))); 47 } 48 for (auto& t : threads) { 49 t.join(); 50 } 51 std::cout << logger.getLog() << std::endl; 52 return 0; 53}

We want to be careful with ifs inside critical sections:

1#include <thread> 2#include <iostream> 3#include <vector> 4#include <string> 5#include <sstream> 6#include <functional> 7#include <mutex> 8 9class Logger { 10 public: 11 explicit Logger(size_t max_size) : max_size_(max_size) {} 12 13 void logMessage(const std::string& message) { 14 mtx_.lock(); 15 // Make sure to check IN THE CRITICAL SECTION 16 if (log_.size() + message.size() > max_size_) { 17 mtx_.unlock(); // Make sure to unlock even on failures` 18 return; 19 } 20 log_.append(message); 21 mtx_.unlock(); 22 } 23make sure to unlock 24 const std::string& getLog() const { 25 return log_; 26 } 27 private: 28 std::string log_; 29 std::mutex mtx_; 30 size_t max_size_; 31}; 32 33static void threadBody(int id, std::vector<std::string>& names, Logger& logger) { 34 // [...] 35} 36 37int main() { 38 std::vector<std::thread> threads; 39 std::vector<std::string> names = { 40 "Alice", "Bob", "Charlie", "David", 41 "Eve", "Frank", "Grace", "Heidi" 42 }; 43 Logger logger(100); 44 for (size_t i = 0; i < 8; i++) { 45 threads.push_back(std::thread(threadBody, i, std::ref(names), std::ref(logger))); 46 } 47 for (auto& t : threads) { 48 t.join(); 49 } 50 std::cout << logger.getLog() << std::endl; 51 return 0; 52}

2.2 Scoped Locks

We can make use of destructors to implement a scoped lock:

1class ScopedLock { 2 public: 3 explicit ScopedLock(std::mutex& mtx) : mtx_(mtx) { 4 mtx_.lock(); 5 } 6 7 ~ScopedLock() { // Called at the end of lifetime, not garbage collection 8 mtx_.unlock(); 9 } 10 private: 11 std::mutex& mtx_; 12};

Now, we can do:

1void logMessage(const std::string& message) { 2 ScopedLock namedoesntmatter(mtx_); 3 if (log_.size() + message.size() > max_size_) { 4 return; 5 } 6 log_.append(message); 7}

This is called RAII (Resource Acquisition Is Initialization), also known as constructor acquires, destructor releases. It is a common resource management pattern in C++. In reality, we should use the standard libary's std::scoped_lock:

1void logMessage(const std::string& message) { 2 std::scoped_lock<std::mutex> namedoesntmatter(mtx_); 3 if (log_.size() + message.size() > max_size_) { 4 return; 5 } 6 log_.append(message); 7}

2.3 Race Conditions

These two conditions lead to non-deterministic behaviour. This is a problem, but can be used intentionally.

2.4 Data Races

A data race occurs when:

This is an undefined behaviour - the program has no semantics and is allowed to do anything!

This means a compiler will optimise on the assumption that no data races occur:

1#include <thread> 2#include <functional> 3#include <chrono> 4#include <iostream> 5 6static void foo(int* x) { 7 std::this_thread::sleep_for(std::chrono::seconds(1)); 8 std::cout << "Waking up thread 2" << std::endl; 9 x = 1; 10} 11 12static void bar(int& x) { 13 std::cout << "Thread 2 is waiting" << std::endl; 14 while (x == 0); // Spin wait 15 16 std::cout << "Thread 2 has awoken" << std::endl; 17 return; 18} 19 20int main() { 21 // The two threads share a variable x 22 int x = 0; 23 std::thread t1(foo, std::ref(x)); 24 std::thread t2(bar, std::ref(x)); 25 t1.join(); 26 t2.join(); 27 return 0; 28}

Thread Sanitizer

We can detect data races in g++ and clang++ with the -fsanitize=thread flag, and use -g to get line numbers in race reports.

Race conditions are usually intentional, and can be controlled with mutexes and atomics. Data races are bugs, which can be race conditions. Data races are value oblivious - they occur regardless of the values being read or written. Storing the same value to a location is a data race but not a race condition.

2.5 Locks

A std::scoped_lock is constructed with one or more mutexes. Locks all mutexes on construction. Unlocks all mutexes on destruction. However, does not:

A std::unique_lock constructed with one mutex. By default it locks it on construction, but can be disabled. Allows unlocking and relocking, unlocking on destruction. Allows ownership transfer with std::move (hence a unique lock).

Scoped locks are more efficient, so should be preferred unless the extra features of unique locks are needed.

2.6 Conditions Variables

A condition variable allows threads to wait for some condition to become true.

C++ Condition Variables

To use condition variables in C++:

1#include <condition_variable> 2 3std::condition_variable cond_; 4 5cond_.notify_one(); // Wake up one waiting thread 6cond_.notify_all(); // Wake up all waiting threads

To wait on a condition variable:

The wait function repeatedly:

  1. Returns if the predicate is true.
  2. Releases the lock.
  3. Blocks until another thread calls notify_one or notify_all on the condition variable.
  4. Acquires the lock again.

We can demonstrate this with a bounded FIFO queue. The queue has a fixed length, with a count, head and tail index. We have two condition variables: not_empty_ and not_full_. Let's start with an unsafe version:

1#include <thread> 2#include <mutex> 3#include <condition_variable> 4#include <vector> 5#include <iostream> 6 7class LockedQueue { 8public: 9 explicit LockedQueue(size_t capacity) { 10 contents_.resize(capacity); 11 } 12 13 void enq(int element) { 14 while (count_ == contents_.size()); // Busy wait 15 contents_[tail_] = element; 16 tail_ = (tail_ + 1) % contents_.size(); 17 count_++; 18 } 19 20 int deq() { 21 while (count_ == 0); // Busy wait 22 int result = contents_[head_]; 23 head_ = (head_ + 1) % contents_.size(); 24 count_--; 25 return result; 26 } 27 28private: 29 std::vector<int> contents_; 30 size_t count_ = 0; 31 size_t head_ = 0; 32 size_t tail_ = 0; 33} 34 35int main() { 36 const size_t NUM_CONSUMERS = 8; 37 const size_t ELEMENTS_TO_PRODUCE = (1 << 24); 38 const size_t ELEMENTS_PER_CONSUMER = ELEMENTS_TO_PRODUCE / NUM_CONSUMERS; 39 40 LockedQueue producer_to_consumers(256); 41 LockedQueue consumers_to_producer(NUM_CONSUMERS); 42 int final_result = 0; 43 44 std::thread producer([&producer_to_consumers, &final_result, &consumers_to_producer]() -> void { 45 for (size_t i = 0; i < ELEMENTS_TO_PRODUCE; i++) { 46 producer_to_consumers.enq(1); // Write some integers to the queue 47 } 48 for (size_t i = 0; i < NUM_CONSUMERS; i++) { 49 final_result += consumers_to_producer.deq(); // Add results from the consumers 50 } 51 }); 52 53 std::vector<std::thread> consumers; 54 consumers.reserve(NUM_CONSUMERS); // Avoid reallocations 55 for (size_t i = 0; i < NUM_CONSUMERS; i++) { 56 consumers.emplace_back([&producer_to_consumers, &consumers_to_producer]() -> void { 57 int my_result = 0; 58 for (size_t j = 0; j < ELEMENTS_PER_CONSUMER; j++) { 59 my_result += producer_to_consumers.deq(); 60 } 61 consumers_to_producer.enq(my_result); 62 }); 63 } 64 65 producer.join(); 66 for (auto& c : consumers) { 67 c.join(); 68 } 69 70 std::cout << "Final result: " << final_result << std::endl; 71 72 return 0; 73}

If we ran the above code, and it was thread safe, we should get 1 << 24 as the final result. However, since it is not thread-safe, the program has no semantics and is non-deterministic. To fix it, we can protect the critical sections with mutexes and condition variables:

1class LockedQueue { 2public: 3 explicit LockedQueue(size_t capacity) { 4 contents_.resize(capacity); 5 } 6 7 void enq(int element) { 8 std::unique_lock<std::mutex> lock(mtx_); 9 condition_.wait(lock, [this]() -> bool { 10 // Use a condition variable to wait until the not full 11 return count_ < contents_.size(); 12 }); 13 14 contents_[tail_] = element; 15 tail_ = (tail_ + 1) % contents_.size(); 16 count_++; 17 18 // Now that we have added an element, notify a waiting dequeuer 19 condition_.notify_one(); 20 } 21 22 int deq() { 23 std::unique_lock<std::mutex> lock(mtx_); 24 condition_.wait(lock, [this]() -> bool { 25 return count_ > 0; 26 }); 27 28 int result = contents_[head_]; 29 head_ = (head_ + 1) % contents_.size(); 30 count_--; 31 32 // Now that we have removed an element, notify a waiting enqueuer 33 condition_.notify_one(); 34 35 return result; 36 } 37 38private: 39 std::vector<int> contents_; 40 size_t count_ = 0; 41 size_t head_ = 0; 42 size_t tail_ = 0; 43 std::mutex mtx_; 44 std::condition_variable condition_; 45}

However, now if the size of the queue is less than the number of threads, we can get a deadlock. This is because we have one condition variable and are calling notify_one, which may wake up a thread that cannot proceed. To fix this, we need two condition variables: one for not_empty_ and one for not_full_:

1class LockedQueue { 2public: 3 explicit LockedQueue(size_t capacity) { 4 contents_.resize(capacity); 5 } 6 7 void enq(int element) { 8 std::unique_lock<std::mutex> lock(mtx_); 9 not_full_.wait(lock, [this]() -> bool { 10 // Use a condition variable to wait until the not full 11 return count_ < contents_.size(); 12 }); 13 14 contents_[tail_] = element; 15 tail_ = (tail_ + 1) % contents_.size(); 16 count_++; 17 18 // Now that we have added an element, notify a waiting dequeuer 19 not_empty_.notify_one(); 20 } 21 22 int deq() { 23 std::unique_lock<std::mutex> lock(mtx_); 24 not_empty_.wait(lock, [this]() -> bool { 25 return count_ > 0; 26 }); 27 28 int result = contents_[head_]; 29 head_ = (head_ + 1) % contents_.size(); 30 count_--; 31 32 // Now that we have removed an element, notify a waiting enqueuer 33 not_full_.notify_one(); 34 35 return result; 36 } 37 38private: 39 std::vector<int> contents_; 40 size_t count_ = 0; 41 size_t head_ = 0; 42 size_t tail_ = 0; 43 std::mutex mtx_; 44 std::condition_variable not_empty_; 45 std::condition_variable not_full_; 46}

Warning

We cannot do:

1if (count_ == 1) { 2 not_empty_.notify_all(); 3}

This is because multiple threads may be waiting, and if we only notify when count_ == 1, we may miss notifying some threads when multiple items are added.

3. Atomics

Atomics allow well defined race conditions, and allow fine grained synchronisation between threads.

C++ Atomics

To use atomics in C++:

1#include <atomic> 2 3std::atomic<int> x(0); 4std::atomic<MyStruct*> ptr(&my_struct); 5std::atomic<size_t> size(0u); 6std::atomic<bool> flag(false);

A RMW operation can be performed on an atomic variable, which reads the old value and writes a new value (either a completely new value or a function of the old one). This is all performed in one indivisible operation.

C++ Atomic Operations

To perform atomic operations in C++:

1#include <atomic> 2 3void store(T value); // Atomic store 4T load(); // Atomic load 5T exchange(T value) // Store value and return old value (RMW) 6 7// If x == expected, set x = desired, else set expected = x (RMW) 8// Returns true on success, false on failure 9bool compare_exchange_strong(T& expected, T desired); 10bool compare_exchange_weak(T& expected, T desired);

The weak version may fail spuriously (behave as if compare failed even if it succeeded). On old architectures, this variant is cheaper and works fine in loops.

There are more operations on integral atomics:

1T fetch_add(T value); // Atomically add value and return old value (RMW) 2T fetch_sub(T value); // Atomically subtract value and return old value (RMW) 3T fetch_and(T value); // Atomically AND value and return old value (RMW 4T fetch_or(T value); // Atomically OR value and return old value (RMW) 5T fetch_xor(T value); // Atomically XOR value and return old value (R

Additionally, common operators are overloaded, but this should probably be avoided for clarity.

A data race occurs when:

1.1 Parallel Find

1#include <vector> 2#include <iostream> 3#include <thread> 4#include <atomic> 5#include <functional> 6 7template<typename T> 8size_t find(const std::vector<T>& data, std::function<bool(T)> predicate) { 9 std::atomic<size_t> result(std::numeric_limits<size_t>::max()); 10 11 const size_t chunk_size = data.size() / 2; 12 auto thread_body = [chunk_size, &predicate, &data, &result](size_t id) -> void { 13 // Loop through a segment of the vector according to its IDs 14 for (size_t i = id * chunk_size; i < (id + 1) * chunk_size; i++) { 15 if (predicate(data[i])) { 16 result.store(i); 17 return; 18 } 19 } 20 }; 21 22 auto t1 = std::thread(thread_body, 0); 23 auto t2 = std::thread(thread_body, 1); 24 25 t1.join(); 26 t2.join(); 27 28 return result.load(); // Can also be implicitly converted to size_t 29} 30 31int main() { 32 std::vector<int> data; 33 for (size_t count = 0; count < 2; count++) { 34 for (int i = 0; i < (1 << 24); i++) { 35 data.push_back(i); 36 } 37 } 38 size_t result = find<int>(data, [](int item) -> bool { 39 return item == (1 << 23); 40 }); 41 if (result != std::numeric_limits<size_t>::max()) { 42 std::cout << "Found at index " << result << std::endl; 43 } else { 44 std::cout << "Not found" << std::endl; 45 } 46 return 0; 47}

1.2 Relaxed Atomics

If we have:

1std::atomic<int> x = 0; 2std::atomic<int> y = 0; 3 4// Thread 1: 5x = 1; 6print(y); 7 8// Thread 2: 9y = 1; 10print(x);

We may see, under sequential consistency 10, 01 or 11. However, mordern multicore CPUs are not SC, as memory barriers can be expensive.

In C++, atomic operations are sequentially consistent by default:

1x.store(42, std::memory_order_seq_cst); // DEFAULT 2x.store(42, std::memory_order_relaxed); // Only guarantees SC per location

An example of using relaxed atomics is store buffering:

A store buffer is a hardware mechanism that allows a CPU core to temporarily hold writes before they are made visible to other cores. This lets the processor continue executing instructions without waiting for the write to complete, improving performance. However, it can lead to memory reordering effects, which is why relaxed atomics are used to model such behavior in concurrent programs.

1#include <thread> 2#include <atomic> 3#include <iostream> 4#include <chrono> 5 6static std::atomic<int> x; 7static std::atomic<int> y; 8 9static int r1; 10static int r2; 11 12static void T1() { 13 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 14 x.store(1); 15 r1 = y.load(); 16} 17 18static void T2() { 19 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 20 y.store(1); 21 r2 = x.load(); 22} 23 24int main() { 25 while (true) { 26 x.store(0); 27 y.store(0); 28 r1 = 0; 29 r2 = 0; 30 auto t1 = std::thread(T1); 31 auto t2 = std::thread(T2); 32 t1.join(); 33 t2.join(); 34 std::cout << r1 << " " << r2 << std::endl; 35 if (r1 == 0 && r2 == 0) { 36 break; 37 } 38 } 39}

Here, this program will never halt, as std::memory_order_seq_cst is used by default, and sequential consistency is guaranteed, under which 00 is not possible. If we change it to:

1static void T1() { 2 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 3 x.store(1, std::memory_order_relaxed); 4 r1 = y.load(std::memory_order_relaxed); 5} 6 7static void T2() { 8 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 9 y.store(1, std::memory_order_relaxed); 10 r2 = x.load(std::memory_order_relaxed); 11}

This program may halt, as under std::memory_order_relaxed sequential consistency is not guaranteed, so 00 is possible and the program halts.

1.3 Relaxed Parallel Find

Relaxed ordering is useful when you don't care in what order things happen. We can update our parallel find example to be relaxed:

1#include <vector> 2#include <iostream> 3#include <thread> 4#include <atomic> 5#include <functional> 6 7template<typename T> 8size_t find(const std::vector<T>& data, std::function<bool(T)> predicate) { 9 std::atomic<size_t> result(std::numeric_limits<size_t>::max()); 10 const size_t chunk_size = data.size() / 2; 11 auto thread_body = [chunk_size, &predicate, &data, &result](size_t id) -> void { 12 for (size_t i = id * chunk_size; i < (id + 1) * chunk_size; i++) { 13 if (predicate(data[i])) { 14 result.store(i, std::memory_order_relaxed); 15 return; 16 } 17 } 18 }; 19 20 auto t1 = std::thread(thread_body, 0); 21 auto t2 = std::thread(thread_body, 1); 22 t1.join(); 23 t2.join(); 24 return result.load(std::memory_order_relaxed); 25} 26 27int main() { 28 std::vector<int> data; 29 for (size_t c`ount = 0; count < 2; count++) { 30 for (int i = 0; i < (1 << 24); i++) { 31 data.push_back(i); 32 } 33 } 34 size_t result = find<int>(data, [](int item) -> bool { 35 return item == (1 << 23); 36 }); 37 if (result != std::numeric_limits<size_t>::max()) { 38 std::cout << "Found at index " << result << std::endl; 39 } else { 40 std::cout << "Not found" << std::endl; 41 } 42 return 0; 43}

It is maybe slightly faster, but in this case won't matter much since we only store / load on max once per thread.

1.4 Release Acquire Consistency

We may want to pass a message between threads to signify something is ready. Sequential consistency easily facilitates message passing with an atomic boolean, but a spin wait requires a memory barrier on some architectures, making this very expensive. However, making it relaxed would be wrong: e.g. the flag may be set to true before the data is ready.

A release memory order can only be used on stores. An acquire can only be used on loads. It will synchronise any stores to loads. This may require fewer memory barriers than SC, but more memory barriers than a relaxed memory order. This is efficient and correct.

Difference to SC

If we have:

1// Thread 1: 2x.store(1, std::memory_order_release); 3 4// Thread 2: 5y.store(1, std::memory_order_release); 6 7// Thread 3: 8a = x.load(std::memory_order_acquire); 9b = y.load(std::memory_order_acquire); 10 11// Thread 4: 12c = y.load(std::memory_order_acquire); 13d = x.load(std::memory_order_acquire);

Under release-acquire, it is possible to get , which is not possible under SC, as no interleaving would permit this outcome.

What it does do is establish an order between the operations before a release, and those after an acquire.

This release-acquire consistency prevents data races, as it violates "accesses are not ordered by synchronisation". This can be violated by:

For example:

1#include <thread> 2#include <atomic> 3#include <functional> 4#include <iostream> 5 6int main() { 7 int data = 0; 8 std::atomic<bool> flag(false); 9 10 auto t1 = std::thread([&data, &flag]() -> void { 11 data = 42; 12 flag.store(true, std::memory_order_release); 13 }); 14 auto t2 = std::thread([&data, &flag]() -> void { 15 while (!flag.load(std::memory_order_acquire)); // spin 16 std::cout << data << std::endl; 17 }); 18 19 t1.join(); 20 t2.join(); 21 return 0; 22}

Here, using std::memory_order_seq_cst would be inefficient and using std::memory_order_relaxed would be wrong and cause a data race.

Multi-copy atomicity is where a store by core becomes observable to all other cores simultaneously. Only some CPU provide this guarantee (e.g. x86). On other architectures, a store may become visible to some cores before others.

4. Spinlocks

Spinlocks are useful when a lock deley is expected to be short, during short critical sections or low contention. These avoid making syscalls, and can have a bad worst case behaviour.

1#include <atomic> 2 3class SimpleSpinlock { 4public: 5 SimpleSpinlock() : lock_bit_(false) {} 6 7 void lock() { 8 while (lock_bit_.exchange(true)); 9 } 10 11 void unlock() { 12 lock_bit_.store(false); 13 } 14private: 15 std::atomic<bool> lock_bit_; 16};

4.1 Local Spinning

Under high contention, this leads to poor performance due to cache thrashing - since every test-and-set call invalidates the chace line, invalidating it on other cores. To solve, this, we can do local spinning - spin on a local variable, and only do a test-and-set when the lock appears to be free.

1#include <atomic> 2 3class LocalSpinlock { 4public: 5 LocalSpinlock() : lock_bit_(false) {} 6 7 void lock() { 8 while (lock_bit_.exchange(true)) { 9 while (lock_bit_.load()); // Spin locally 10 } 11 } 12 13 void unlock() { 14 lock_bit_.store(false); 15 } 16private: 17 std::atomic<bool> lock_bit_; 18};

4.2 Active Backoff

However, threads detect a lock is free at almost the same time, causing high bus traffic. Instead, we can do active backoff:

1#include <atomic> 2 3class ActiveBackoffSpinlock { 4public: 5 ActiveBackoffSpinlock() : lock_bit_(false) {} 6 7 void lock() { 8 while (lock_bit_.exchange(true)) { 9 do { 10 // Volatile to prevent optimization 11 for (volatile int i = 0; i < 100; i++); 12 } while (lock_bit_.load()); 13 } 14 } 15 16 void unlock() { 17 lock_bit_.store(false); 18 } 19private: 20 std::atomic<bool> lock_bit_; 21};

4.3 Passive Backoff

Processors have instructions for backoff that allow it to do nothing more efficiently.

1#include <atomic> 2#include <emmintrin.h> 3 4class PassiveBackoffSpinlock { 5public: 6 PassiveBackoffSpinlock() : lock_bit_(false) {} 7 8 void lock() { 9 while (lock_bit_.exchange(true)) { 10 do { 11 __mm_pause(); 12 __mm_pause(); 13 __mm_pause(); 14 __mm_pause(); 15 } while (lock_bit_.load()); 16 } 17 } 18 void unlock() { 19 lock_bit_.store(false); 20 } 21private: 22 std::atomic<bool> lock_bit_; 23};

4.4 Exponential Backoff

Its hard to choose the right amount of backoff. Instead, we can do exponential backoff - double the backoff time each time we fail to acquire the lock, up to a maximum.

1#include <atomic> 2#include <emmintrin.h> 3 4class ExponentialBackoffSpinlock { 5public: 6 ExponentialBackoffSpinlock() : lock_bit_(false) {} 7 8 void lock() { 9 const int kMinBackoffIterations = 4; 10 const int kMaxBackoffIterations = 1 << 10; 11 12 int backoff = kMinBackoffIterations; 13 while (lock_bit_.exchange(true)) { 14 do { 15 for (int i = 0; i < backoff; i++) { 16 __mm_pause(); 17 } 18 backoff = std::min(backoff * 2, kMaxBackoffIterations); 19 } while (lock_bit_.load()); 20 } 21 } 22 void unlock() { 23 lock_bit_.store(false); 24 } 25private: 26 std::atomic<bool> lock_bit_; 27};

Backoff can underutilise the critical section when the lock is contended.

4.5 Ticket Locks

Unfairness leads to starvation but is often faster due to cache locality. In a ticket lock:

1#include <atomic> 2 3class TicketLock { 4public: 5 TicketLock() : next_ticket_(0), now_serving_(0) {} 6 7 void lock() { 8 const unsigned my_ticket = next_ticket_.fetch_add(1); 9 while (now_serving_.load() != my_ticket); 10 } 11 12 void unlock() { 13 now_serving_.store(now_serving_.load() + 1);z 14 } 15private: 16 std::atomic<unsigned> next_ticket_; 17 std::atomic<unsigned> now_serving_; 18};

4.6 Memory Ordering

We can optimise it by using acquire and release semantics:

1void lock() { 2 while (lock_bit_.exchange(true, std::memory_order_acquire)) { 3 while (lock_bit_.load(std::memory_order_relaxed)); 4 } 5} 6 7void unlock() { 8 lock_bit_.store(false, std::memory_order_release); 9}

5. Futexes

Unlike spinlocks, sleeping locks allow threads to sleep when the lock is contended, however, they have a high overhead due to syscalls.

To create a hybrid lock, we can use a futex syscall (fast userspace mutex). It works on userspace data, can be used to implement synchronization primitives. It is very flexible, and can be used to implement more than just mutexes. The syscall provides:

5.1 Simple Mutex

Mutex state is an atomic int. Since sizeof(std::atomic<int>) == sizeof(int), we can use the atomic int directly as the futex. 0 is lock free, 1 is locked.

To lock the mutex, atomically exchange state_ with 1:

To unlock the mutex, store 0 to state_, then call futex_wake to wake up one waiting thread.

1#include <atomic> 2#include <sys/syscall.h> 3#include <linux/futex.h> 4 5class MutexSimple { 6public: 7 MutexSimple() : state_(kFree) {} 8 9 void lock() { 10 while (state_.exchange(kLocked) != kFree) { 11 syscall(SYS_futex, reinterpret_cast<int*>(&state_), FUTEX_WAIT, kLocked, nullptr, nullptr, 0); 12 } 13 } 14 15 void unlock() { 16 state_.store(kFree); 17 syscall(SYS_futex, reinterpret_cast<int*>(&state_), FUTEX_WAKE, 1, nullptr, nullptr, 0); 18 } 19 20private: 21 const int kFree = 0; 22 const int kLocked = 1; 23 std::atomic<int> state_; 24}

However, in this design we needlessly do a futex wake syscall.

5.2 Smarter Mutex

Now there are 3 state values:

To lock, do a compare exchange with state_ with 0, setting it to 1 if successful:

Before we unlock, state_ is either 1 or 2:

1#include <atomic> 2#include <sys/syscall.h> 3#include <linux/futex.h> 4 5class MutexSmart { 6public: 7 MutexSmart() : state_(kFree) {} 8 9 void lock() { 10 int old_value = compare_exchange(kFree, kLockedNoWaiters); 11 if (old_value == kFree) { 12 return; // Lock acquired 13 } 14 15 do { 16 if (old_value == kLockedWaiters || 17 compare_exchange(kLockedNoWaiters, kLockedWithWaiters) != kFree) { 18 syscall(SYS_futex, reinterpret_cast<int*>(&state_), FUTEX_WAIT, kLockedWithWaiters, nullptr, nullptr, 0); 19 } 20 old_value = compare_exchange(kFree, kLockedWithWaiters); 21 } while (old_value != kFree); 22 } 23 24 void unlock() { 25 int old_value = state_.exchange(kFree); 26 if (old_value == kLockedWithWaiters) { 27 syscall(SYS_futex, reinterpret_cast<int*>(&state_), FUTEX_WAKE, 1, nullptr, nullptr, 0); 28 } 29 } 30 31private: 32 int compare_exchange(int expected, int desired) { 33 // If state_ == expected, set state_ = desired and leave expected alone 34 // Otherwise, it will leave state_ alone and set expected = state_ 35 state_.compare_exchange_strong(expected, desired); 36 return expected; // Old value of state_ 37 } 38 39 const int kFree = 0; 40 const int kLockedNoWaiters = 1; 41 const int kLockedWithWaiters = 2; 42 std::atomic<int> state_; 43}
Back to Home