Skip to content

Synchronization

Concurrency problems

  • race conditions
    • result dependent on timing execution of code
  • interleaved executions
    • threads interleave executions arbitrarily and at different rates
    • schedulling is not under program control
    • fine-grained
      • all instructions are atomic
  • shared resources
    • NOT shared:
      • local variables
    • shared
      • global variable
      • static variables
      • heap objects

Solution: sync

  • control proper coordination among threads
  • mechanisms
  • pattenrns

critical section

  • one thread only
    • assume: all threads will eventually left critical section, and all threads will eventually enter Critical section
  • solution: mutual exclusion → larger atomic locks

Atomic instructions - Test And Set

  • record the old value
  • set the value to true
  • return the old value
//atomic: we expect the return value to be `*flag` and *flag = true
bool test_and_set(bool *flag) {
    bool old = *flag; // (1)
    *flag = true; //(2)
    return old;
}
  1. atomic: if interrupted here, it already set old to *flag, so it does not matter if the value of *flag is changed
  2. atomic: if interrupted here, no matter what the value of *flag is, old will not be changed since it is a local variable which is not shared with other threads
struct lock {
    bool held = false;
}
void acquire(lock) {
    while (test_and_set(&lock->held));
}
void release(lock) {
    lock->held = false;
}
  • why we need busy waiting?

    • We need busy waiting because multiple threads may simultaneously attempt to acquire the lock. Busy waiting (spinning) keeps trying test_and_set until it successfully detects the lock is free, ensuring that eventual acquisition happens without missing the lock release.
  • suppose there is another thread call test_and_set why does it not enter the critical section?

    • If another thread calls test_and_set:

      It reads True (lock already held),
      
      test_and_set returns True,
      
      The while loop continues spinning,
      
      Therefore, it does not pass the while loop to enter the critical section.
      
  • siumlate steps when another thread try to acquire the lock when the current thread is in the critical section

Scenario: Two Threads (A and B)

  1. Initial State: lock.held is false. The lock is free.
  2. Thread A wants the lock:
    • Calls acquire(&lock).
    • Enters the while loop.
    • Calls atomic_test_and_set(&lock.held).
    • Atomically: Reads the current value (false), sets lock.held to true.
    • atomic_test_and_set returns the original value, which was false.
    • The while condition becomes while(false), which is false.
    • Thread A exits the loop and has successfully acquired the lock. lock.held is now true.
  3. Thread B wants the lock (while A holds it):
    • Calls acquire(&lock).
    • Enters the while loop.
    • Calls atomic_test_and_set(&lock.held).
    • Atomically: Reads the current value (true), sets lock.held to true (no change).
    • atomic_test_and_set returns the original value, which was true.
    • The while condition becomes while(true), which is true.
    • Thread B stays in the loop, immediately calls atomic_test_and_set again. It will keep reading true and returning true, effectively "spinning" and consuming CPU cycles while waiting.
  4. Thread A releases the lock:
    • Finishes its critical section.
    • Calls release(&lock).
    • Sets lock.held to false.
  5. Thread B (still spinning) tries again:
    • Calls atomic_test_and_set(&lock.held).
    • Atomically: Reads the current value (false), sets lock.held to true.
    • atomic_test_and_set returns the original value, which was false.
    • The while condition becomes while(false), which is false.
    • Thread B exits the loop and has successfully acquired the lock. lock.held is now true.

locks

  • acquire, lock
  • release, unlock to leave critical section
  • pair calls acquire and release

spin lock

void acquire(lock) {
    while (lock.held) lock.held = true;
}
  • this does not work, because:
    1. it is spining, so if there are multiple threads try to acqure the same lock, once it is released, all threads may think they acquired the locked and entered the critical section together, which is not wanted.
    2. busy waiting is wastful
    3. locks have its critical section
      1. release/acquire needs to be atomic
        1. not atomic - involuntery context switches
          1. disable/restore interrupts
            1. disable interruption only on cpu core basis
          2. atomic instructions: test and set

lock queue

  • use a queue to make sure that only one thread can enter the critical section at a time
  • when a thread wants to acquire a lock, it adds itself to the queue
  • when a thread releases the lock, it removes itself from the queue and allows the next thread in the queue to acquire the lock

semaphore

A semaphore is a synchronization primitive that is used to control access to a shared resource by multiple threads.

  • supports 2 operators: wait and signal
  • wait - decrement the semaphore value, if the value is less than 0, the thread is blocked until the value is greater than or equal to 0 (will return without waiting only when s > 0)
  • signal - increment the semaphore value, if the value is less than or equal to 0, the thread is unblocked and can continue execution
  • s, is the semaphore value, counts the number of available resources

producer-consumer problem

  • one or more producers produce data and one or more consumers consume data

download and run the producer-consumer code

Different Rate of Execution
  • producer and consumer may not run at the same rate
  • producer may produce data faster than consumer can consume it, or vice versa
  • there are be a buffer to store the data produced by the producer until the consumer can consume it
  • TASK ARE INDEPENDENT and can be executed out of order

  • potential problems:

    • buffer overflow
    • buffer underflow

constraints

  1. consumer must wait for producer to produce data
  2. producer must wait for consumer to consume data
  3. only one thread can remove/add data from/to the buffer at a time
spinning version
wait(s) {
    while (s <= 0) {
        // block the thread
    }
    s--;
}

signal(s) {
    s++;
    // unblock the thread
}
blocking version
wait(s) {
    if (s <= 0) {
        sleep(); // block the thread
    }
    s--;
}

signal(s) {
    if (queued thread)
        wakeup(); // unblock the thread
    s++;
}
  • here is a diagram showing how semaphore works:
  • suppose initial semaphore value is 2
flowchart TD
    Start("Start: Semaphore s = 2") --> Thread1["Thread 1 calls wait()"]
    Thread1 --> Dec1["Decrement s to 1"]
    Dec1 --> CS1["Thread 1 enters critical section"]

    Start --> Thread2["Thread 2 calls wait()"]
    Thread2 --> Dec2["Decrement s to 0"]
    Dec2 --> CS2["Thread 2 enters critical section"]

    Start --> Thread3["Thread 3 calls wait()"]
    Thread3 --> Check3{"Is s > 0?"}
    Check3 -->|"No (s = 0)"| Block["Thread 3 blocks and waits"]

    CS1 --> Sig1["Thread 1 calls signal()"]
    Sig1 --> Inc1["Increment s to 1"]
    Inc1 --> WakeT3["Wake up Thread 3"]
    WakeT3 --> Dec3["Thread 3 decrements s to 0"]
    Dec3 --> CS3["Thread 3 enters critical section"]

    CS2 --> Sig2["Thread 2 calls signal()"]
    Sig2 --> Inc2["Increment s to 1"]

    CS3 --> Sig3["Thread 3 calls signal()"]
    Sig3 --> Inc3["Increment s to 2"]

    Inc2 -.-> End("End: Semaphore s = 2")
    Inc3 -.-> End

proper use of semaphore

consumer
while True:
    lock.acquire()
    while len(buffer) == 0: # (2)
        if items_produced_count >= MAX_ITEMS_TO_PRODUCE: # no such exit condition in production
            print(f'consumer {consumer_id} exiting')
            lock.release() # (1)
            return
        print(f"BUFFER EMPTY, consumer {consumer_id} waiting ...")
        buffer_not_empty.wait();
    item = buffer.popleft()
    buffer_not_full.notify() # (3)
    items_consumed_count += 1
    lock.release()
    print(f'consumer {consumer_id} consumes {item}')
    time.sleep(CONSUMER_DELAY_MAX) # (4)
  1. you MUST release the lock before the thread exits, waits or blocks, otherwise, it will cause a deadlock

  2. can we replace this with if len(buffer) == 0?

    • no, mind for racing consumers conditions. this is not significant in notify, but if a thread calls notifyAll, we need to make sure that the buffer is not empty before we take the item from the buffer.
  3. can you notify the producer after release the lock?

    • in Mesa Sematics no, because when you release the lock, always assume there are some threads take the lock, so it is possible Lost Wake-up happens, when the producer was interrupted before wait and miss the notification.
  4. can we put time.sleep in the critical section?

    • no, because it will block the critical section and other threads cannot enter the critical section, which will cause a deadlock.
producer
for i in range(items_to_make): # this should be infinite loop in production
    time.sleep(PRODUCER_DELAY_MAX) # (1)
    item = f'item-{producer_id}-{i}'
    lock.acquire()
    while(len(buffer) >= BUFFER_SIZE): # (2)
        print(f'Buffer FULL, producer {producer_id} Waiting ...')
        buffer_not_full.wait() # this will auto release lock

    buffer.append(item)
    items_produced_count += 1
    buffer_not_empty.notify()
    lock.release()

    print(f"Producer {producer_id}: produced {item}.")

print(f"Producer {producer_id}: Finished producing {items_to_make} items.")
  1. can we put time.sleep in the critical section?

    • no, because it will block the critical section and other threads cannot enter the critical section, which will cause a deadlock.
  2. can we replace this with if len(buffer) >= BUFFER_SIZE?

    • in this demo, yes, but in production, no. because it is possible that the producer is interrupted before buffer.append(item) and the buffer is not full anymore, so we need to check again if the buffer is full after being awaked.
      • also, if using notifyAll, it is possible that the producer is interrupted before buffer.append(item) and the buffer is not full anymore, so we need to check again if the buffer is full after being awaked.

Producer-Consumer with Condition Variables

  • condition variables are used to block a thread until a particular condition is met
producer
class Producer implements Runnable {
    private Condition notFull;
    private Condition notEmpty;
    private Lock lock; // this is shared
    private BufferQueue buffer; // this is shared
    private int count; // this is shared
    ...
    public void run() {
        while (true) {
            produceItem();
            lock.lock();
            if (count == buffer.capacity) {
                notFull.await(); // wait until the buffer is not full
            }
            buffer.add(item);
            count++;
            if (count == 1) {
                notEmpty.signal(); // signal that the buffer is not empty
            }
            lock.unlock();
        }
    }
}
consumer
class Consumer implements Runnable {
    private Condition notFull;
    private Condition notEmpty;
    private Lock lock; // this is shared
    private BufferQueue buffer; // this is shared
    private int count; // this is shared
    ...
    public void run() {
        while (true) {
            lock.lock();
            if (count == 0) {
                notEmpty.await(); // wait until the buffer is not empty
            }
            Item item = buffer.remove();
            count--;
            if (count == buffer.capacity - 1) {
                notFull.signal(); // signal that the buffer is not full
            }
            lock.unlock();
            consumeItem(item);
        }
    }
}

Signal Sematics

alt text

monitor

a programming language construct that controls access to shared data

  • Encapsulates:
    • shared data structures
    • procedures that operate on the shared data structures
    • synchronization between concurrent threads that invoke the procedures
  • guarantees mutual exclusion

Reader and Writer Problem

download and run the reader-writer code

multiple readers, one writer

  • multiple readers can read the data at the same time, but only one writer can write the data at a time

constraints

  1. writer only proceed when there are no readers or other writers
  2. reader only proceed when there are no writers

solution

# --- Semaphores ---
# Controls access to read_count (ensures atomic updates)
# Initial value 1 means only one thread can access read_count at a time
mutex = threading.Semaphore(1)

# Controls writer access. Also used by the first reader to block writers.
# Initial value 1 means one writer OR the first reader can acquire it.
block_write = threading.Semaphore(1)

# --- State ---
# Number of readers currently accessing the shared data
read_count = 0
reader
def reader(reader_id):
    global read_count
    print(f"Reader {reader_id}: Wants to read.")

    # --- Entry Section ---
    mutex.acquire() # Lock access to read_count
    read_count += 1
    if read_count == 1:
        # First reader blocks any waiting/incoming writers
        print(f"Reader {reader_id}: Is the first reader, blocking writers.")
        block_write.acquire()
    mutex.release() # Unlock access to read_count
    # --- End Entry Section ---

    # --- Critical Section (Reading) ---
    print(f"Reader {reader_id}: READING data -> {shared_data}")
    time.sleep(random.uniform(0.1, READ_TIME_MAX))
    print(f"Reader {reader_id}: Finished reading.")
    # --- End Critical Section ---

    # --- Exit Section ---
    mutex.acquire() # Lock access to read_count
    read_count -= 1
    if read_count == 0:
        # Last reader unblocks writers
        print(f"Reader {reader_id}: Is the last reader, unblocking writers.")
        block_write.release()
    mutex.release() # Unlock access to read_count
    # --- End Exit Section ---
writer
# --- Writer Thread ---
def writer(writer_id):
    print(f"Writer {writer_id}: Wants to write.")

    # --- Entry Section ---
    # Request exclusive access; blocks if readers are active or another writer is writing
    block_write.acquire()
    print(f"Writer {writer_id}: Acquired write lock.")
    # --- End Entry Section ---

    # --- Critical Section (Writing) ---
    print(f"Writer {writer_id}: WRITING data...")
    time.sleep(random.uniform(0.1, WRITE_TIME_MAX))
    new_value = random.randint(1, 100)
    shared_data["value"] = new_value
    shared_data["last_updated_by"] = f"Writer {writer_id}"
    print(f"Writer {writer_id}: Finished writing -> {shared_data}")
    # --- End Critical Section ---

    # --- Exit Section ---
    block_write.release() # Release exclusive access
    print(f"Writer {writer_id}: Released write lock.")
    # --- End Exit Section ---

More Problems

Hiker-Biker problem

alt text

download and run the hiker-biker code

Make Pizza Problem

alt text

download and run the make pizza code