Synchronization
Concurrency problems
- race conditions
- result dependent on timing execution of code
- interleaved executions
- threads interleave executions arbitrarily and at different rates
- schedulling is not under program control
- fine-grained
- all instructions are atomic
- shared resources
- NOT shared:
- local variables
- shared
- global variable
- static variables
- heap objects
- NOT shared:
Solution: sync
- control proper coordination among threads
- mechanisms
- pattenrns
critical section
- one thread only
- assume: all threads will eventually left critical section, and all threads will eventually enter Critical section
-
solution: mutual exclusion → larger atomic locks
Atomic instructions - Test And Set
- record the old value
- set the value to true
- return the old value
//atomic: we expect the return value to be `*flag` and *flag = true
bool test_and_set(bool *flag) {
bool old = *flag; // (1)
*flag = true; //(2)
return old;
}
- atomic: if interrupted here, it already set
oldto*flag, so it does not matter if the value of*flagis changed - atomic: if interrupted here, no matter what the value of
*flagis,oldwill not be changed since it is a local variable which is not shared with other threads
struct lock {
bool held = false;
}
void acquire(lock) {
while (test_and_set(&lock->held));
}
void release(lock) {
lock->held = false;
}
-
why we need busy waiting?
- We need busy waiting because multiple threads may simultaneously attempt to acquire the lock. Busy waiting (spinning) keeps trying
test_and_setuntil it successfully detects the lock is free, ensuring that eventual acquisition happens without missing the lock release.
- We need busy waiting because multiple threads may simultaneously attempt to acquire the lock. Busy waiting (spinning) keeps trying
-
suppose there is another thread call
test_and_setwhy does it not enter the critical section?-
If another thread calls test_and_set:
It reads True (lock already held), test_and_set returns True, The while loop continues spinning, Therefore, it does not pass the while loop to enter the critical section.
-
-
siumlate steps when another thread try to acquire the lock when the current thread is in the critical section
Scenario: Two Threads (A and B)
- Initial State:
lock.heldisfalse. The lock is free. - Thread A wants the lock:
- Calls
acquire(&lock). - Enters the
whileloop. - Calls
atomic_test_and_set(&lock.held). - Atomically: Reads the current value (
false), setslock.heldtotrue. atomic_test_and_setreturns the original value, which wasfalse.- The
whilecondition becomeswhile(false), which is false. - Thread A exits the loop and has successfully acquired the lock.
lock.heldis nowtrue.
- Calls
- Thread B wants the lock (while A holds it):
- Calls
acquire(&lock). - Enters the
whileloop. - Calls
atomic_test_and_set(&lock.held). - Atomically: Reads the current value (
true), setslock.heldtotrue(no change). atomic_test_and_setreturns the original value, which wastrue.- The
whilecondition becomeswhile(true), which is true. - Thread B stays in the loop, immediately calls
atomic_test_and_setagain. It will keep readingtrueand returningtrue, effectively "spinning" and consuming CPU cycles while waiting.
- Calls
- Thread A releases the lock:
- Finishes its critical section.
- Calls
release(&lock). - Sets
lock.heldtofalse.
- Thread B (still spinning) tries again:
- Calls
atomic_test_and_set(&lock.held). - Atomically: Reads the current value (
false), setslock.heldtotrue. atomic_test_and_setreturns the original value, which wasfalse.- The
whilecondition becomeswhile(false), which is false. - Thread B exits the loop and has successfully acquired the lock.
lock.heldis nowtrue.
- Calls
locks
acquire,lockrelease,unlockto leave critical section- pair calls acquire and release
spin lock
- this does not work, because:
- it is spining, so if there are multiple threads try to acqure the same lock, once it is released, all threads may think they acquired the locked and entered the critical section together, which is not wanted.
- busy waiting is wastful
- locks have its critical section
- release/acquire needs to be atomic
- not atomic - involuntery context switches
- disable/restore interrupts
- disable interruption only on cpu core basis
- atomic instructions: test and set
- disable/restore interrupts
- not atomic - involuntery context switches
- release/acquire needs to be atomic
lock queue
- use a queue to make sure that only one thread can enter the critical section at a time
- when a thread wants to acquire a lock, it adds itself to the queue
- when a thread releases the lock, it removes itself from the queue and allows the next thread in the queue to acquire the lock
semaphore
A semaphore is a synchronization primitive that is used to control access to a shared resource by multiple threads.
- supports 2 operators:
waitandsignal wait- decrement the semaphore value, if the value is less than 0, the thread is blocked until the value is greater than or equal to 0 (will return without waiting only whens > 0)signal- increment the semaphore value, if the value is less than or equal to 0, the thread is unblocked and can continue executions, is the semaphore value, counts the number of available resources
producer-consumer problem
- one or more producers produce data and one or more consumers consume data
download and run the producer-consumer code
Different Rate of Execution
- producer and consumer may not run at the same rate
- producer may produce data faster than consumer can consume it, or vice versa
- there are be a buffer to store the data produced by the producer until the consumer can consume it
-
TASK ARE INDEPENDENT and can be executed out of order
-
potential problems:
- buffer overflow
- buffer underflow
constraints
- consumer must wait for producer to produce data
- producer must wait for consumer to consume data
- only one thread can remove/add data from/to the buffer at a time
- here is a diagram showing how semaphore works:
- suppose initial semaphore value is 2
flowchart TD
Start("Start: Semaphore s = 2") --> Thread1["Thread 1 calls wait()"]
Thread1 --> Dec1["Decrement s to 1"]
Dec1 --> CS1["Thread 1 enters critical section"]
Start --> Thread2["Thread 2 calls wait()"]
Thread2 --> Dec2["Decrement s to 0"]
Dec2 --> CS2["Thread 2 enters critical section"]
Start --> Thread3["Thread 3 calls wait()"]
Thread3 --> Check3{"Is s > 0?"}
Check3 -->|"No (s = 0)"| Block["Thread 3 blocks and waits"]
CS1 --> Sig1["Thread 1 calls signal()"]
Sig1 --> Inc1["Increment s to 1"]
Inc1 --> WakeT3["Wake up Thread 3"]
WakeT3 --> Dec3["Thread 3 decrements s to 0"]
Dec3 --> CS3["Thread 3 enters critical section"]
CS2 --> Sig2["Thread 2 calls signal()"]
Sig2 --> Inc2["Increment s to 1"]
CS3 --> Sig3["Thread 3 calls signal()"]
Sig3 --> Inc3["Increment s to 2"]
Inc2 -.-> End("End: Semaphore s = 2")
Inc3 -.-> End
proper use of semaphore
while True:
lock.acquire()
while len(buffer) == 0: # (2)
if items_produced_count >= MAX_ITEMS_TO_PRODUCE: # no such exit condition in production
print(f'consumer {consumer_id} exiting')
lock.release() # (1)
return
print(f"BUFFER EMPTY, consumer {consumer_id} waiting ...")
buffer_not_empty.wait();
item = buffer.popleft()
buffer_not_full.notify() # (3)
items_consumed_count += 1
lock.release()
print(f'consumer {consumer_id} consumes {item}')
time.sleep(CONSUMER_DELAY_MAX) # (4)
-
you MUST release the lock before the thread
exits,waitsorblocks, otherwise, it will cause a deadlock -
can we replace this with
if len(buffer) == 0?- no, mind for racing consumers conditions. this is not significant in
notify, but if a thread callsnotifyAll, we need to make sure that the buffer is not empty before we take the item from the buffer.
- no, mind for racing consumers conditions. this is not significant in
-
can you notify the producer after release the lock?
- in Mesa Sematics no, because when you release the lock, always assume there are some threads take the lock, so it is possible Lost Wake-up happens, when the producer was interrupted before wait and miss the notification.
-
can we put
time.sleepin the critical section?- no, because it will block the critical section and other threads cannot enter the critical section, which will cause a deadlock.
for i in range(items_to_make): # this should be infinite loop in production
time.sleep(PRODUCER_DELAY_MAX) # (1)
item = f'item-{producer_id}-{i}'
lock.acquire()
while(len(buffer) >= BUFFER_SIZE): # (2)
print(f'Buffer FULL, producer {producer_id} Waiting ...')
buffer_not_full.wait() # this will auto release lock
buffer.append(item)
items_produced_count += 1
buffer_not_empty.notify()
lock.release()
print(f"Producer {producer_id}: produced {item}.")
print(f"Producer {producer_id}: Finished producing {items_to_make} items.")
-
can we put
time.sleepin the critical section?- no, because it will block the critical section and other threads cannot enter the critical section, which will cause a deadlock.
-
can we replace this with
if len(buffer) >= BUFFER_SIZE?- in this demo, yes, but in production, no. because it is possible that the producer is interrupted before
buffer.append(item)and the buffer is not full anymore, so we need to check again if the buffer is full after being awaked.- also, if using
notifyAll, it is possible that the producer is interrupted beforebuffer.append(item)and the buffer is not full anymore, so we need to check again if the buffer is full after being awaked.
- also, if using
- in this demo, yes, but in production, no. because it is possible that the producer is interrupted before
Producer-Consumer with Condition Variables
- condition variables are used to block a thread until a particular condition is met
class Producer implements Runnable {
private Condition notFull;
private Condition notEmpty;
private Lock lock; // this is shared
private BufferQueue buffer; // this is shared
private int count; // this is shared
...
public void run() {
while (true) {
produceItem();
lock.lock();
if (count == buffer.capacity) {
notFull.await(); // wait until the buffer is not full
}
buffer.add(item);
count++;
if (count == 1) {
notEmpty.signal(); // signal that the buffer is not empty
}
lock.unlock();
}
}
}
class Consumer implements Runnable {
private Condition notFull;
private Condition notEmpty;
private Lock lock; // this is shared
private BufferQueue buffer; // this is shared
private int count; // this is shared
...
public void run() {
while (true) {
lock.lock();
if (count == 0) {
notEmpty.await(); // wait until the buffer is not empty
}
Item item = buffer.remove();
count--;
if (count == buffer.capacity - 1) {
notFull.signal(); // signal that the buffer is not full
}
lock.unlock();
consumeItem(item);
}
}
}
Signal Sematics

monitor
a programming language construct that controls access to shared data
- Encapsulates:
- shared data structures
- procedures that operate on the shared data structures
- synchronization between concurrent threads that invoke the procedures
- guarantees mutual exclusion
Reader and Writer Problem
download and run the reader-writer code
multiple readers, one writer
- multiple readers can read the data at the same time, but only one writer can write the data at a time
constraints
- writer only proceed when there are no readers or other writers
- reader only proceed when there are no writers
solution
# --- Semaphores ---
# Controls access to read_count (ensures atomic updates)
# Initial value 1 means only one thread can access read_count at a time
mutex = threading.Semaphore(1)
# Controls writer access. Also used by the first reader to block writers.
# Initial value 1 means one writer OR the first reader can acquire it.
block_write = threading.Semaphore(1)
# --- State ---
# Number of readers currently accessing the shared data
read_count = 0
def reader(reader_id):
global read_count
print(f"Reader {reader_id}: Wants to read.")
# --- Entry Section ---
mutex.acquire() # Lock access to read_count
read_count += 1
if read_count == 1:
# First reader blocks any waiting/incoming writers
print(f"Reader {reader_id}: Is the first reader, blocking writers.")
block_write.acquire()
mutex.release() # Unlock access to read_count
# --- End Entry Section ---
# --- Critical Section (Reading) ---
print(f"Reader {reader_id}: READING data -> {shared_data}")
time.sleep(random.uniform(0.1, READ_TIME_MAX))
print(f"Reader {reader_id}: Finished reading.")
# --- End Critical Section ---
# --- Exit Section ---
mutex.acquire() # Lock access to read_count
read_count -= 1
if read_count == 0:
# Last reader unblocks writers
print(f"Reader {reader_id}: Is the last reader, unblocking writers.")
block_write.release()
mutex.release() # Unlock access to read_count
# --- End Exit Section ---
# --- Writer Thread ---
def writer(writer_id):
print(f"Writer {writer_id}: Wants to write.")
# --- Entry Section ---
# Request exclusive access; blocks if readers are active or another writer is writing
block_write.acquire()
print(f"Writer {writer_id}: Acquired write lock.")
# --- End Entry Section ---
# --- Critical Section (Writing) ---
print(f"Writer {writer_id}: WRITING data...")
time.sleep(random.uniform(0.1, WRITE_TIME_MAX))
new_value = random.randint(1, 100)
shared_data["value"] = new_value
shared_data["last_updated_by"] = f"Writer {writer_id}"
print(f"Writer {writer_id}: Finished writing -> {shared_data}")
# --- End Critical Section ---
# --- Exit Section ---
block_write.release() # Release exclusive access
print(f"Writer {writer_id}: Released write lock.")
# --- End Exit Section ---
More Problems
Hiker-Biker problem

download and run the hiker-biker code
Make Pizza Problem
