Python Docs

Multithreading

Threads run concurrently inside a single process and share the same memory space. Because of the Global Interpreter Lock (GIL), CPU-bound code doesn't scale well with threads, but I/O-bound work (network, disk, waiting) can improve significantly.

Basic Threads

Create a Thread with a target function and start them with .start(). Use .join() to wait for all threads to finish.

import threading

def worker(n):
    print("worker", n)

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]

for t in threads:
    t.start()

for t in threads:
    t.join()

Locks & Race Conditions

When multiple threads modify shared state, you can get race conditions. Use Lock (or RLock) to protect critical sections.

import threading

counter = 0
lock = threading.Lock()

def inc():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1

threads = [threading.Thread(target=inc) for _ in range(4)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)
  • Without the with lock: block, the final counter would likely be less than expected.
  • Locks serialize access to shared variables to keep them consistent.

Thread-safe Queues (Producer–Consumer)

queue.Queue is a synchronized FIFO queue designed for multi-threaded producer–consumer workflows.

import threading, queue, time

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)
        time.sleep(0.1)
    q.put(None)  # Sentinel

def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print("got", item)
        q.task_done()

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

A sentinel value (like None) is a common way to signal consumers to stop.

ThreadPoolExecutor

For many small I/O tasks (HTTP calls, file reads, etc.), use a thread pool instead of manually creating/joining threads.

from concurrent.futures import ThreadPoolExecutor
import requests

urls = [
    "https://example.com",
    "https://httpbin.org/get",
    "https://github.com",
]

def fetch(url):
    resp = requests.get(url, timeout=5)
    return url, resp.status_code

with ThreadPoolExecutor(max_workers=5) as ex:
    for url, code in ex.map(fetch, urls):
        print(url, code)

Best Practices

  • Use threads for I/O-bound tasks (network calls, disk I/O, waiting), not heavy CPU math.
  • Always synchronize shared state using locks, queues, or other thread-safe primitives.
  • Prefer ThreadPoolExecutor over manual thread creation for task pools.
  • Avoid sharing too many global variables; pass data via queues or function arguments where possible.