开发者

Python threading全景指南分享

目录
  • 1. 为什么需要线程
  • 2. 从 0 开始写线程
    • 2.1 创建线程的两种姿势
    • 2.2 join:别让主线程提前跑路
  • 3. 线程同步:共享变量的“安全带”
    • 3.1 Lock(互斥锁)
    • 3.2 RLock(可重入锁)
    • 3.3 Condition(条件变量)
    • 3.4 Semaphore(信号量)
    • 3.5 Event(事件)
    • 3.6 Barrier(栅栏)
  • 4. 线程局部变量:ThreadLocal
    • 5. 定时器 Timer:延时任务
      • 6. 线程池:高并发下的“资源管家”
        • 7. 调试与最佳实践
          • 7.1 死锁排查
          • 7.2 GIL 与性能
          • 7.3 守护线程 daemon
          • 7.4 日志线程名
          • 7.5 不要滥用
        • 8. 总结

          “并发不等于并行,但并发能让生活更美好。”——《python 并发编程实战》

          1. 为什么需要线程

          CandroidPU 单核性能已逼近物理极限,要想让程序在相同时间内做更多事,必须“同时”做多件事。

          • 多进程 Process:利用多核并行,资源隔离但开销大。
          • 协程 Coroutine:单线程内切换,极致 I/O 友好,但无法利用多核。
          • 线程 Thread:介于两者之间,共享内存、切换快,是 I/O 密集型任务的首选。

          在 Python 里,GIL(Global Interpreter Lock)限制了同一进程内只能有一条字节码在执行,进而“弱化”了线程在多核javascript CPU 上的并行能力。然而:

          • 线程在等待 I/O 时会主动释放 GIL,因此下载、爬虫、聊天服务器等网络/磁盘 I/O 场景依旧收益巨大。
          • 对 CPU 密集型任务,可用 multiprocessing 或 C 扩展绕开 GIL。

          一句话:

          当你想让程序“边读边写”“边收边发”“边阻塞边响应”,就用 threading。

          2. 从 0 开始写线程

          2.1 创建线程的两种姿势

          import threading, time
          
          # 方式一:把函数塞给 Thread
          def worker(n):
              print(f'Worker {n} start')
              time.sleep(1)
              print(f'Worker {n} done')
          
          for i in range(3):
              t = threading.Thread(target=worker, args=(i,))
              t.start()
          # 方式二:继承 Thread 并重写 run
          class MyThread(threading.Thread):
              def __init__(self, n):
                  super().__init__()
                  self.n = n
              def run(self):
                  print(f'MyThread {self.n} start')
                  time.sleep(1)
                  print(f'MyThread {self.n} done')
          
          MyThread(10).start()

          2.2 join:别让主线程提前跑路

          start() 只是告诉操作系统“可以调度了”,不保证立即执行。

          threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
          [t.start() for t in threads]
          [t.join() for t in threads]  # 等全部结束
          print('all done')

          3. 线程同步:共享变量的“安全带”

          3.1 Lock(互斥锁)

          竞争最激烈的原语,解决“读写交叉”问题。

          counter = 0
          lock = threading.Lock()
          
          def add():
              global counter
              for _ in range(100000):
                  with lock:             # 等价于 lock.acquire(); try: ... finally: lock.release()
                      counter += 1
          
          threads = [threading.Thread(target=add) for _ in range(2)]
          [t.start() for t in threads]
          [t.join() for t in threads]
          print(counter)   # 200000

          没有 lock 时,大概率得到 <200000 的错误结果。

          3.2 RLock(可重入锁)

          同一个线程可以多次 acquire,避免死锁。

          rlock = threading.RLock()
          def foo():
              with rlock:
                  bar()
          
          def bar():
              with rlock:   # 同一线程,再次获取成功
                  pass

          3.3 Condition(条件变量)

          经典“生产者-消费者”模型:

          import random, time
          q, MAX = [], 5
          cond = threading.Condition()
          
          def producer():
              while True:
                  with cond:
                      while len(q) == MAX:
                          pythoncond.wait()          # 等待队列有空位
                      item = random.randint(1, 100)
                      q.append(item)
                      print('+', item, q)
                      cond.notify()            # 通知消费者
                  time.sleep(0.5)
          
          def consumer():
              while True:
                  with cond:
                      while not q:
                          cond.wait()
                      item = q.pop(0)
                      print('-', item, q)
                      cond.notify()
                  time.sleep(0.6)
          
          threading.Thread(target=producer, daemon=True).start()
          threading.Thread(target=consumer, daemon=True).start()
          time.sleep(5)

          3.4 Semaphore(信号量)

          控制并发数量,例如“最多 编程3 个线程同时下载”。

          sem = threading.Semaphore(3)
          def downloader(url):
              with sem:
                  print('downloading', url)
                  time.sleep(2)

          3.5 Event(事件)

          线程间“发令枪”机制:

          event = threading.Event()
          
          def waiter():
              print('wait...')
              event.wait()          # 阻塞
              print('go!')
          
          threading.Thread(target=waiter).start()
          time.sleep(3)
          event.set()               # 发令

          3.6 Barrier(栅栏)

          N 个线程同时到达某点后再一起继续,适合分阶段任务。

          barrier = threading.Barrier(3)
          
          def phase(name):
              print(name, 'ready')
              barrier.wait()
              print(name, 'go')
          
          for i in range(3):
              threading.Thread(target=phase, args=(i,)).start()

          4. 线程局部变量:ThreadLocal

          共享虽好,可有时我们想让每个线程拥有“私有副本”。

          local = threading.local()
          
          def show():
              print(f'{threading.current_thread().name} -> {local.x}')
          
          def task(n):
              local.x = n
              show()
          
          for i in range(3):
              threading.Thread(target=task, args=(i,)).start()

          5. 定时器 Timer:延时任务

          def hello():
              print('hello, timer')
          threading.Timer(3.0, hello).start()

          常用于“超时取消”“心跳包”等场景。

          6. 线程池:高并发下的“资源管家”

          频繁创建/销毁线程代价高昂,Python 3.2+ 内置 concurrent.futures.ThreadPoolExecutor 提供池化能力。

          from concurrent.futures import ThreadPoolExecutor
          import requests, time
          
          URLS = ['https://baidu.com'] * 20
          
          def fetch(url):
              return requests.get(url).status_code
          
          with ThreadPoolExecutor(max_workers=10) as pool:
              for code in pool.map(fetch, URLS):
                  print(code)
          • max_workers 默认为 min(32, os.cpu_count() + 4),I/O 密集场景可调高。
          • submit + as_completed 组合可实现“谁先完成谁处理”。

          7. 调试与最佳实践

          7.1 死锁排查

          • 保持加锁顺序一致。
          • 使用 try-lock + 超时。
          • 借助第三方库 deadlock-debug 或 faulthandler。

          7.2 GIL 与性能

          • CPU 密集:换多进程、Cython、NumPy、multiprocessing。
          • I/O 密集:放心用线程,瓶颈在网络延迟而非 GIL。

          7.3 守护线程 daemon

          • 当只编程客栈剩守护线程时,程序直接退出。
          • 常用于后台心跳、日志写入,但不要做重要数据持久化。

          7.4 日志线程名

          logging.basicConfig(
              format='%(asctime)s [%(threadName)s] %(message)s',
              level=logging.INFO)

          7.5 不要滥用

          • GUI 程序:UI 线程勿阻塞,耗时操作放后台线程。
          • Web 服务:WSGI 服务器(uWSGI、gunicorn)已帮你管理进程/线程,业务代码慎用线程。

          8. 总结

          维度线程进程协程
          内存开销极低
          数据共享难(需 IPC)
          切换成本极低
          适合场景I/O 密集CPU 密集超高并发 I/O
          Python 限制GIL

          使用 threading 的黄金法则:

          • 明确任务是 I/O 密集。
          • 共享变量就用锁,或者别共享。
          • 用 ThreadPoolExecutor 减少手工创建。
          • 守护线程只干辅助活。
          • 调试时给线程起名字、打日志。

          以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜