开发者

由浅入深介绍python asyncio的各种用法与代码示例

目录
  • 1. 基础概念速览
  • 2. 最小可运行示例:async编程客栈/await+asyncio.run
  • 3. 并发执行:create_task、gather、as_completed
  • 4. 超时控制与取消:wait_for、asyncio.timeout、Task.cancel
  • 5. 限流与同步原语:Semaphore、Lock、Event、Condition
  • 6. 生产者-消费者:asyncio.Queue
  • 7. 任务编组(python 3.11+):asyncio.TaskGroup
  • 8. 与线程/同步代码协作:to_thread、run_in_executor
  • 9. TCP/UDP 网络编程(内置 Streams / Protocols)
    • 9.1 TCP Echo(Server & Client,基于 Streams)
    • 9.2 UDP(Datagram)
  • 10. 子进程(异步等待):asyncio.create_subprocess_exec
    • 11. 异步上下文管理器与迭代器:async with、async for
      • 12. 超实用模式集
        • 12.1 背压与批处理
        • 12.2 幂等重试 + 指数退避
      • 13. 常见坑与最佳实践
        • 14. 进阶延伸(可选)

          下面是一份“由浅入深”的 asyncio 实战手册。先解释核心概念,再给出可直接运行的、循序渐进的示例代码。全部示例都只依赖标准库(除少数标注处),适配 Python 3.10+(如使用 TaskGroupasyncio.timeout() 的段落需要 3.11+)。

          1. 基础概念速览

          • async def:定义协程函数(coroutine function)。
          • await:在协程中等待另一个可等待对象(协程、asyncio.Taskasyncio.Future 等)。
          • 事件循环(Event Loop):调度协程、I/O 事件的核心;用 asyncio.run(main()) 启动。
          • 并发 ≠ 并行:asyncio 是单线程协作式并发,靠 I/O 等待时让出控制权。
          • 任务(Task):把协程“提交给”事件循环让其并发执行:asyncio.create_task(coro())

          2. 最小可运行示例:async/await+asyncio.run

          # demo_basic.py
          import asyncio
          
          async def io_job(name, delay):
              print(f"[{name}] start")
              await asyncio.sleep(delay)  # 模拟I/O等待
              print(f"[{name}] done after {delay}s")
              return name, delay
          
          async def main():
              r1 = await io_job("A", 1)
              r2 = await io_job("B", 2)  # 串行等待,总耗时约3秒
              print("results:", r1, r2)
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          3. 并发执行:create_task、gather、as_completed

          # demo_concurrency.py
          import asyncio, random
          
          async def fetch(i):
              d = random.uniform(0.5, 2.0)
              await asyncio.sleep(d)
              return f"task-{i}", round(d, 2)
          
          async def main():
              # 3.1 create_task + await
              t1 = asyncio.create_task(fetch(1))
              t2 = asyncio.create_task(fetch(2))
              r1 = await t1
              r2 = await t2
              print("create_task results:", r1, r2)
          
              # 3.2 gather(顺序返回结果;遇异常默认会立刻抛出)
              results = await asyncio.gather(*(fetch(i) for i in range(3, 8)))
              print("gather results:", results)
          
              # 3.3 as_completed(谁先完成先拿谁)
              tasks = [asyncio.create_task(fetch(i)) for i in range(8, 13)]
              for fut in asyncio.as_completed(tasks):
                  print("as_completed:", await fut)
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          4. 超时控制与取消:wait_for、asyncio.timeout、Task.cancel

          # demo_timeout_cancel.py
          import asyncio
          
          async def slow():
              try:
                  await asyncio.sleep(5)
                  return "ok"
              except asyncio.CancelledError:
                  print("slow() got cancelled!")
                  raise
          
          async def main():
              # 4.1 wait_for(3.8+)
              try:
                  res = await asyncio.wait_for(slow(), timeout=2)
                  print("result:", res)
              except asyncio.TimeoutError:
                  print("wait_for timeout!")
          
              # 4.2 asyncio.timeout 上下文(3.11+)
              try:
                  async with asyncio.timeout(2):  # 注:需 Python 3.11+
                      await slow()
              except TimeoutError:
                  print("context timeout!")
          
              # 4.3 手动取消
              t = asyncio.create_task(slow())
              await asyncio.sleep(1)
              t.cancel()
              try:
                  await t
              except asyncio.CancelledError:
                  print("task cancelled confirmed")
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          小贴士:被取消的任务应正确处理 CancelledError,并在需要时做清理(finally)。

          5. 限流与同步原语:Semaphore、Lock、Event、Condition

          # demo_sync_primitives.py
          import asyncio, random
          
          sem = asyncio.Semaphore(3)  # 同时最多3个并发
          lock = asyncio.Lock()
          evt = asyncio.Event()
          
          async def worker(i):
              async with sem:  # 限制并发
                  await asyncio.sleep(random.uniform(0.2, 1.0))
                  async with lock:  # 保护共享输出(示意)
                      print(f"worker {i} done")
          
          async def notifier():
              await asyncio.sleep(1)
              evt.set()  # 广播事件
          
          async def waiter():
              print("waiting for event...")
              await evt.wait()
              print("event received!")
          
          async def main():
              tasks = [asyncio.create_task(worker(i)) for i in range(10)]
              tasks += [asyncio.create_task(notifier()), asyncio.create_task(waiter())]
              await asyncio.gather(*tasks)
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          Condition 适合更复杂的“等待某条件成立”的场景,用法与 threading.Condition 类似(只是换成 async with / await)。

          6. 生产者-消费者:asyncio.Queue

          # demo_queue.py
          import asyncio, random
          
          async def producer(q: asyncio.Queue):
              for i in range(10):
                  await asyncio.sleep(random.uniform(0.1, 0.4))
                  await q.put((i, f"data-{i}"))
                  print(f"produced {i}")
              await q.put(None)  # 结束哨兵
          
          async def consumer(q: asyncio.Queue):
              while True:
                  item = await q.get()
                  if item is None:
                      q.task_done()
                      break
                  i, data = item
                  await asyncio.sleep(0.3)
                  print(f"consumed {i} -> {data}")
                  q.task_done()
          
          async def main():
              q = asyncio.Queue(maxsize=5)
              prod = asyncio.create_task(producer(q))
              cons = asyncio.create_task(consumer(q))
              await asyncio.gather(prod)
              await q.join()        # 等全部消费完成
              await cons            # 等消费者退出
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          7. 任务编组(Python 3.11+):asyncio.TaskGroup

          # demo_taskgroup.py
          import asyncio, random
          
          async def job(n):
              await asyncio.sleep(random.uniform(0.2, 1.0))
              if n == 3:
                  raise RuntimeError("boom at 3")
              return n
          
          async def main():
              try:
                  async with asyncio.TaskGroup() as tg:
                      tasks = [tg.create_task(job(i)) for i in range(5)]
                      # 出错会自动取消其余任务并向外传播异常
              except* RuntimeError as eg:  # PEP 654(ExceptionGroup)
                  print("caught:", eg)
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          对比 gatherTaskGroup 在结构化并发上更可靠,失败会自动收拢与传播。

          8. 与线程/同步代码协作:to_thread、run_in_executor

          # demo_thread_bridge.py
          import asyncio, time, concurrent.futures
          
          def blocking_io(n):
              time.sleep(n)
              return f"blocking {n}s"
          
          async def main():
              # 8.1 Python 3.9+ 推荐:to_thread
              r1 = await asyncio.to_thread(blocking_io, 1)
              print("to_thread:", r1)
          
              # 8.2 传统:run_in_executor
              loop = asyncio.get_running_loop()
              with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
                  futs = [loop.run_in_executor(pool, blocking_io, i) for i in (1, 2, 1)]
                  for r in await asyncio.gather(*futs):
                      print("executor:", r)
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          原则:CPU 密集就放线程/进程池;I/O 密集await 原生异步接口。

          9. TCP/UDP 网络编程(内置 Streams / Protocols)

          9.1 TCP Echo(Server & Client,基于 Streams)

          # demo_tcp_echo.py
          import asyncio
          
          async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
              addr = writer.get_extra_info('peername'www.devze.com)
              print(f"client connected: {addr}")
              try:
                  while data := await reader.readline():
                      msg = data.decode().rstrip()
                      print(f"recv: {msg}")
                      writer.write((msg + "\n").encode())
                      await writer.drain()
              except asyncio.CancelledError:
                  raise
              finally:
                  writer.close()
                  await writer.wait_closed()
                  print("client closed", addr)
          
          async def run_server():
              server = await asyncio.start_server(handle_echo, "127.0.0.1", 8888)
              addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
              print(f"Serving on {addrs}")
              async with server:
                  await server.serve_forever()
          
          async def run_client():
              reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
              for i in range(3):
                  writer.write(f"hello {i}\n".encode())
                  await writer.drain()
                  echo = await reader.readline()
                  print("echo:", echo.decode().rstrip())
              writer.close()
              await writer.wait_closed()
          
          async def main():
              server_task = asyncio.create_task(run_server())
              await asyncio.sleep(0.2)
              await run_client()
              server_task.cancel()
              with contextlib.suppress(asyncio.CancelledError):
                  await server_task
          
          if __name__ == "__main__":
              import contextlib
              asyncio.run(main())
          

          9.2 UDP(Datagram)

          # demo_udp.py
          import asyncio
          
          class EchoServer(asyncio.DatagramProtocol):
              def datagram_received(self, data, addr):
                  print("server recv:", data, "from", addr)
                  self.transport.sendto(data, addr)
          
          async def main():
              loop = asyncio.get_running_loop()
              transport, _ = await loop.create_datagram_endpoint(
                  lambda: EchoServer(), local_addr=("127.0.0.1", 9999)
              )
              # client
              on_resp = loop.create_future()
              class Client(asyncio.DatagramProtocol):
                  def datagram_received(self, data,php addr):
                      print("client recv:", data)
                      on_resp.set_result(None)
              ctransport, _ = await loop.create_datagram_endpoint(
                  lambda: Client(), remote_addr=("127.0.0.1", 9999)
              )
              ctransport.sendto(b"hello-udp")
              await on_resp
              transport.close()
              ctransport.close()
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          10. 子进程(异步等待):asyncio.create_subprocess_exec

          # demo_subprocess.py
          import asyncio, sys
          
          async def main():
              # 跨平台示例:调用 python -c 'print("hi")'
              proc = await asyncio.create_subprocess_exec(
                  sys.executable, "-c", 'print("hi from child")',
                  stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
              )
              out, err = await proc.communicate()
              print("stdout:", out.decode().strip(), "| code:", proc.returncode)
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          11. 异步上下文管理器与迭代器:async with、async for

          # demo_async_with_for.py
          import asyncio
          from contextlib import asynccontextmanager
          
          @asynccontextmanager
          async def open_resource():
              print("acquire resource")
              await asyncio.sleep(0.2)
              try:
                  yield "RESOURCE"
              finally:
                  await asyncio.sleep(0.2)
                  print("release resource")
          
          class AsyncCounter:
              def __init__(self, n): self.n=n; self.i=0
              def __aiter__(self): return self
              async def __anext__(self):
                  if self.i >= self.n:
                      raise StopAsyncIteration
                  await asyncio.sleep(0.1)
                  self.i += 1
                  return self.i
          
          async def main():
              async with open_resource() as r:
                  print("using:", r)
              async for x in AsyncCounter(5):
                  print("got:", x)
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          12. 超实用模式集

          12.1 背压与批处理

          # demo_backpressure.py
          import asyncio, random
          
          async def producer(q):
              for i in range(30):
                  await q.put(i)             # maxsize 限制可形成背压
                  await asyncio.sleep(0.05)
              await q.put(None)
          
          async def consumer(q):
              BATch = []
              while True:
                  item = await q.get()
                  if item is None:
                      if batch:
                          print("flush batch:", batch)
                      q.task_done()
                      break
                  batch.append(item)
                  if len(batch) >= 8:
                      # 模拟批量处理
                      await asyncio.sleep(random.uniform(0.1, 0.3))
                      print("process batch:", batch)
                      batch.clear()
                  q.task_done()
          
          async def main():
              q = asyncio.Queue(maxsize=10)
              await asyncio.gather(producer(q), consumer(q))
              await q.join()
          
          if __name__ == "__main__":
              asyncio.run(main())
          WkFUIJANE

          12.2 幂等重试 + 指数退避

          # demo_retry.py
          import asyncio, random
          
          async def fragile_call():
              await asyncio.sleep(0.1)
              if random.random() < 0.7:
                  raise RuntimeError("transient")
              return "ok"
          
          async def retry(coro_func, attempts=5, base=0.2):
              for n in range(attempts):
                  try:
                      return await coro_func()
                  except Exception as e:
                      if n == attempts - 1:
                          raise
                      await asyncio.sleep(base * (2 ** n))  # 退避
              raise RuntimeError("unreachable")
          
          async def main():
              try:
                  r = await retry(fragile_call)
                  print("result:", r)
              except Exception as e:
                  print("failed:", e)
          
          if __name__ == "php__main__":
              asyncio.run(main())
          

          13. 常见坑与最佳实践

          入口统一用 asyncio.run(main()),不要混用早期 API(如手动获取 loop、run_until_complete),除非有特殊需求。

          避免阻塞调用(如 time.sleep()、重 CPU 任务)直接出现在协程里;改用 await asyncio.sleep()asyncio.to_thread()/进程池。

          正确处理取消:在 try/except/finally 中传播 CancelledError,避免吞掉取消导致任务“僵尸化”。

          使用限流:对外部服务/磁盘/网络做 Semaphore、队列背压,保护系统。

          结构化并发:优先考虑 TaskGroup(3.11+)来让“任务生命周期”明确,异常聚合安全。

          日志与超时:给关键 I/O 加 timeout,并使用 asyncio.create_task() 后保存句柄,便于监控和取消。

          14. 进阶延伸(可选)

          文件异步:标准库没有真正异步文件 I/O;可选三方库 aiofiles(仅示意):

          # pip install aiofiles
          import aiofiles, asyncio
          async def read_file(p):
              async with aiofiles.open(p, "r", encoding="utf-8") as f:
                  return await f.read()
          

          HTTP 客户端/服务端:aiohttphttpx[http2] 等第三方库更贴近真实网络场景。

          与 GUI/Qt 的集成:可通过 qasync 把 Qt 事件循环与 asyncio 融合(适合你在 QML/PySide6 下需要异步网络/IO 的情况)。

          到此这篇关于由浅入深介绍python asyncio的各种用法与代码示例的文章就介绍到这了,更多相关python asyncio用法内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜