Modern Python has come up to free-threaded version. We can achieve real multi-thread programming in Python. Programmers need to be more careful. Generally speaking, multi-thread is easier than multi-process since resource-sharing is straightforward because all threads are in one process. Python offers lots of thread synchronization and coordination primitives!
Race Condition and Atomicity
Race Condition is a result of uncontrolled access to shared resources. When the wrong access pattern happens, something unexpected occurs. Atomicity is the key to avoid race conditions. If an operation is atomic, no other thread can interrupt it mid-way. It either hasn’t started, or it is completely finished.
Even in machine code level, not every machine code is guaranteed to be atomic. E.g. in x86_64 Instruction Set Architecture (ISA), to make Read-Modify-Write instruction aotmic, we have to prefix it with LOCK instruction. Under each machine code, there might be multiple micro-operations as well.
In C/C++, we literally write high-level machine code. Therefore, we have to explicily avoid race condtions by making all critical sections atomic. However, in Python, the mental picture is a bit different and high-level. No matter how you write python code, you cannot break python interpreter! In other words, you cannot trigger a segmentation fault of CPython implementations no matter how you write you python script (except bugs in CPython)! Everything in Python is an object. CPython guarantees that all objects are always in a memory-safe state. E.g. one thread is accessing a list; another thread is updating it; no mutex applied; you get either new value or old value, but not anything in between. CPython protects its code so anytime a list is always a correct list structure in terms of low-level memory and C-level implementation.
No matter how you write you C/C++ code, you cannot break the machine! (No consider burn it.)
Even CPython guarantees memory safety of all objects whenever accessing them, it doesn’t mean that the logic of your code is right. We still need consider race conditions and how to avoid them. That’s the mindset of multi-thread programming in Python.
Bytecode, GIL, Free Thread
Python is not a raw interpreted language. It compiles script to middle bytecode and run them later on. In Global Interpreter Lock (GIL) protected Python, we can say that each bytecode is atomic. However, we still need to establish critical section for a simple updating.
# Disassembler of Python byte code into mnemonics.
>>> import dis
>>> a = 1
# multi-bytecode for a simple updating
>>> dis.dis('a += 1')
1 0 LOAD_NAME 0 (a)
2 LOAD_CONST 0 (1)
4 INPLACE_ADD
6 STORE_NAME 0 (a)
...
Start from 3.13, Python offers free-threaded version, and no bytecode instructions are atomic by default in free-threaded environment. If your code have race conditions, it will becomes more obvious and reproducible. However, for writing high quality mutl-threading script, the mindset keeps the same: When doubt, add a lock!
Daemon Thread (only in Python)
daemon: A boolean value indicating whether this thread is a daemon thread. This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread. Python main thread is not a daemon thread and therefore all threads created in the main thread default to
daemon=False.
This is not daemon process, such as sshd, systemd or httpd, which is always running in the background rather than waiting for a user to launch them and interact with them. These processes are called daemons and the programs that run as daemons often end with a d to indicate so.
Python daemon thread borrows this concept. Daemon threads are background threads used to help the main non-daemon thread. Therefore, when main thread dies, they are all abruptly stopped. Just like if linux system dies, all daemon processes cannot live anymore. So, the critical things is to remember that Python interpreter waits all non-daemon threads but kill daemon threads instantly when main thread dies. The consequence is that daemon thread cannot do any cleanup job.
Tudou
Mutex (threading.Lock)
The most simple thread synchronization is to use a lock (mutex), which can be hold by only one thread!
import threading
mutex = threading.Lock()
if mutex.locked():
...
mutex.acquire(blocking=True, timeout=-1) # default parameter
# critical section
mutex.release()
with mutex:
... # critical section
if mutex.acquire(blocking=False):
...
mutex.release()
Avoid Deadlock When There Are More Than One Lock
Make sure all threads acquire locks in the exactly the same sequence!
Reentrant Mutex (threading.RLock)
rlock = threading.RLock()
When the lock acquisition happens in a recursive function. Please make sure that the times of acquisition have to be the same of the times of release.
Semaphore (threading.Semaphore)
Semaphore is the oldest software synchronization mechanism in computer history. Unlike lock or rlock, which is used to control the access of one single shared resource, semaphore is perfect for control the access of multiple same shared resources. Therefore, when initializing semaphore, you should specify a number which represents the quantity of shared resources.
sema = threading.Semaphore(4) # default is 1, just like a lock
Acquiring a semaphore means to decrease the internal counter by 1 (P operation). Releasing a semaphore means to increase the internal counter by n (V operation). We can specify n value while releasing, and wake up n waiting threads simultaneously.
BoundedSemaphore
Semaphore could be released to surpass the original initialized resource number. BoundedSemaphore raises when this happens as a secure mechanism.
Event (threading.Event)
Threads could wait an event. When the event is set, they continue to run.
wait2go = threading.Event() # default status is unset, so can be wait
wait2go.is_set() # True is set
wait2go.set() # set the event, wake up waiting threads
wait2go.clear() # make event to be unset
wait2go.wait(timeout=None) # return True only if event is set
Timer (threading.Timer)
Threads could wait a time period before starting.
# thread entry
def job(a,b,c,*,d=9):
print("hello",a,b,c,d)
# threading.Timer(interval, function, args=None, kwargs=None)
t = threading.Timer(3, job, (1,2,3),{'d':4} )
t.start() # run job after 3 seconds
t.cancel() # if job is running, cancel is useless!
Condition (threading.Condition)
Condition is a very efficient way to manage multi-thread scenario, such as complicated producer-consumer case. It could avoid busy-waiting, and save lots of CPU cycles by notification mechanism.
If using a simple lock, you might have this code snippet:
# When this thread can always get the lock,
# but queue is always empty,
# it is a potential busy-waiting loop!
while True:
with lock:
if not queue.empty():
... # do something
Condition solves this issue by introducing another waiting chance when job cannot be done in critical section.
# a lock inside condition variable
conditon = threading.Condition()
# no busy-waiting with condition
while True:
with condition: # block if the lock is holding by other threads
if queue.empty():
condition.wait() # block again in critical section,
# release the lock,
# wake up until it is notified.
... # do something
contidiont.wait() can only be waken up by notification! (Python interpreter couldn’t run these waiting threads if there is no notification.) This avoids busy-waiting, repeatedly acquire, check and release loop. We have two notification interfaces:
with condition: # call notify interface with condition
notify(n=1) # notify 1 to n waiting threads Randomly
# no-op if no waiting threads
# safe if n > number of waiting threads
notify_all() # notify all waiting threads
Be careful, notification couldn’t wake up a specfic waiting thread, super random!
Barrier (threading.Barrier)
If there are fixed number threads which need to wait for each other. Each of the threads tries to pass the barrier by calling the wait() method and will block until all of the threads have made their wait() calls. At this point, the threads are released simultaneously.
# A revsered thread barrier case,
# one thread waits all others finish.
barrier = threading.Barrier(101)
mutex = threading.Lock()
value = 0
def add():
global value
with mutex:
value += 1
barrier.wait()
def show():
barrier.wait()
print(value)
th = threading.Thread(target=show, args=())
th.start()
for i in range(100):
th = threading.Thread(target=add, args=())
th.start()
Thread-Local Data (threading.local)
Thread-local data is also an object in which its properties are thread-local.
# local object
t_local = threading.local()
def add():
for i in range(1000):
t_local.x += 1 # no lock needed
def go():
t_local.x = 0
add()
print(t_local.x) # always 1000
for i in range(10):
t = threading.Thread(target=go, args=())
t.start()
Thread Pool
from concurrent.future import ThreadPoolExecutor
Read-Write Lock (RWLock)
A RWLock is a specialized synchronization primitive designed to optimize performance in multi-threaded applications where data is read frequently but modified infrequently. Reading data doesn’t alter it. Therefore, multiple threads should be allowed to read the data simultaneously without blocking each other. However, modifying data requires absolute isolation. In Python, there is no standard RWLock implementation in threading module because GIL heavily mitigate its performance. We can implement one by leveraging condition.
import threading
# write first means once there is a thread
# want to write, read threads have to wait!
class RWLock_WriteFirst():
def __init__(self):
self._lock = threading.Lock()
self._read = threading.Lock()
self._write = threading.Lock()
self._condition = threading.Condition()
self.read_num = 0
self.write_num = 0
def read_acquire(self):
while True:
with self._write:
if self.write_num != 0:
is_writing = True
else:
is_writing = False
with self._read:
self.read_num += 1
if self.read_num == 1:
self._lock.acquire()
if is_writing:
with self._condition:
self._condition.wait()
else:
break
def read_release(self):
with self._read:
self.read_num -= 1
if self.read_num == 0:
self._lock.release()
def write_acquire(self):
with self._write:
self.write_num += 1
self._lock.acquire()
def write_release(self):
self._lock.release()
with self._write:
self.write_num -= 1
if self.write_num == 0:
with self._condition:
self._condition.notify_all()
When ^C is Not Working
While waiting threads by thread.join() calls, sometimes the process doesn’t response to your ^C. This is because the C-level system call pthread_join is also waiting without a timeout. The bytecode processing of main thread who deals with SIGINT is stuck. We can fix it like this:
# make ^C responsive
for t in theads:
while t.is_alive():
t.join(timeout=1)