开发者

Python异步库asyncio、aiohttp详解

目录
  • asyncio
    • 版本支持
    • 关键概念
    • 工作流程
    • 并发
  • aiohttp
    • ClientSession 会话管理
    • URL 参数传递
    • 获取响应内容
    • 自定义请求头
    • 自定义 cookies
    • 设置代理
  • 异步爬虫示例
    • 总结

      asyncio

      版本支持

      • asyncio 模块在 python3.4 时发布。
      • async 和 await 关键字最早在 Python3.5 中引入。
      • Python3.3 之前不支持。

      关键概念

      • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
      • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
      • future 对象: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
      • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象。
      • async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。

      工作流程

      • 定义/创建协程对象
      • 将协程转为task任务
      • 定义事件循环对象容器
      • 将task任务放到事件循环对象中触发
      import asyncio
      
      async def hello(name):
          print('Hello,', name)
      
      # 定义协程对象
      coroutine = hello("World")
      
      # 定义事件循环对象容器
      loop = asyncio.get_event_loop()
      
      # 将协程转为task任务
      # task = asyncio.ensure_future(coroutine)
      task = loop.create_task(coroutine)
      
      # 将task任务扔进事件循环对象中并触发
      loop.run_until_complete(task)

      并发

      1. 创建多个协程的列表 tasks:

      import asyncio
      
      
      async def do_some_work(x):
          print('Waiting: ', x)
          await asyncio.sleep(x)
          return 'Done after {}s'.format(x)
      
      tasks = [do_some_work(1), do_some_work(2), do_some_work(4)]

      2. 将协程注册到事件循环中:

      • 方法一:使用 asyncio.wait()
      loop = asyncio.get_event_loop()
      loop.run_until_complete(asyncio.wait(tasks))
      • 方法二:使用 asyncio.gather()
      loop = asyncio.get_event_loop()
      loop.run_until_complete(asyncio.gather(*tasks))

      3. 查看 return 结果:

      for task in tasks:
          print('Task ret: ', task.result())

      4. asyncio.wait()asyncio.gather() 的区别:

      接收参数不同:

      • asyncio.wait():必须是一个 list 对象,list 对象里存放多个 task 任务。
      # 使用 asyncio.ensure_future 转换为 task 对象
      tasks=[
             asyncio.ensure_future(factorial("A", 2)),
             asyncio.ensure_future(factorial("B", 3)),
             asyncio.ensure_future(factorial("C", 4))
      ]
      
      # 也可以不转为 task 对象
      # tasks=[
      #        factorial("A", 2),
      #        factorial("B", 3),
      #        factorial("C", 4)
      # ]
      
      loop = asyncio.get_event_loop()
      loop.run_until_complete(asyncio.wait(tasks))
      • asyncio.gather():比较广泛,注意接收 list 对象时 * 不能省略。
      tasks=[
             asyncio.ensure_future(factorial("A", 2)),
             asyncio.ensure_future(factorial("B", 3)),
             asyncio.ensure_future(factorial("C", 4))
      ]
      
      # tasks=[
      #        factorial("A", 2),
      #        factorial("B", 3),
      #        factorial("C", 4)
      # ]
      
      loop = asyncio.get_event_loop()
      loop.run_until_complete(asyncio.gather(*tasks))
      loop = asyncio.get_event_loop()
      
      group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
      group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
      group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])
      
      loop.run_until_complete(asyncio.gather(group1, group2, group3))

      返回结果不同:

      • asyncio.wait():返回 dones(已完成任务) 和 pendings(未完成任务)
      dones, pendings = await asyncio.wait(tasks)
      
      for task in dones:
          print('Task ret: ', task.result())
      • asyncio.gather():直接返回结果
      results = await asyncio.gather(*tasks)
      
      for result in results:
          print('Task ret: ', result)

      aiohttp

      ClientSession 会话管理

      import aiohttp
      import asyncio
      
      
      async def main():
          async with aiohttp.ClientSession() as session:
              async with session.get('http://httpbin.org/get') as resp:
                  print(resp.status)
                  print(await resp.text())
      
      asyncio.run(main())

      其他请求:

      session.post('http://httpbin.org/post', data=b'data')
      session.put('http://httpbin.org/put', data=b'data')
      session.delete('http://httpbin.org/delete')
      session.head('http://httpbin.org/gjavascriptet')
      session.options('http://httpbin.org/get')
      session.patch('http://httpbin.org/patch', data=b'data')

      URL 参数传递

      async def main():
          async with aiohwww.devze.comttp.ClientSession() as ses编程客栈sion:
              params = {'key1': 'value1', 'key2': 'value2'}
              async with session.get('http://httpbin.org/get', params=params) as r:
                  expect = 'http://httpbin.org/get?key1=value1&key2=value2'
                  assert str(r.url) == expect
      async def main():
          async with aiohttp.ClientSession() as session:
              params = [('key', 'value1'), ('key', 'value2')]
              async with session.get('http://httpbin.org/get', params=params) as r:
                  expect = 'http://httpbin.org/get?key=value2&key=value1'
                  assert str(r.url) == expect

      获取响应内容

      async def main():
          async with aiohttp.ClientSession() as session:
              async with session.get('http://httpbin.org/get') as r:
                  # 状态码
                  print(r.status)
                  # 响应内容,可以自定义编码
                  print(await r.text(encoding='utf-8'))
                  # 非文本内容
                  print(await r.read())
                  # jsON 内容
                  print(await r.json())

      自定义请求头

      headers = {
              "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/91.0.4472.106 Safari/537.36"
          }
      
      
      async def main():
          async with aiohttp.ClientSession() as session:
              async with session.get('http://httpbin.org/get', headers=headers) as r:
                  print(r.status)

      为所有会话设置请求头:

      headers = {
              "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"
          }
      
      
      async def main():
          async with aiohttp.ClientSession(headers=headers) as session:
              async with session.get('http://httpbin.org/get') as r:
                  print(r.status)

      自定义 cookies

      async def main():
          cookies = {'cookies_are': 'working'}
          async with aiohttp.ClientSession() as session:
              async with session.get('http://httpbin.org/cookies', cookies=cookies) as resp:
                  assert await resp.json() == {"cookies": {"cookies_are": "working"}}

      为所有会话设置 cookies:

      async def main():
          cookies = {'cookies_are': 'working'}
          async with aiohttp.ClientSession(cookies=cookies) as session:
              async with session.get('http://httpbin.org/cookies') as resp:
                  assert await resp.json() == {"cookies": {"cookies_are": "working"}}

      设置代理

      注意:只支持 http 代理。

      async def main():
          async with aiohttp.ClientSession() as session:
              proxy = "http://127.0.0.1:1080"
              async with session.get("http://python.org", proxy=proxy) as r:
                  print(r.status)

      需要用户名密码授权的代理:

      asyhttp://www.devze.comnc def main():
          async with aiohttp.ClientSession() as session:
              proxy = "http://127.0.0.1:1080"
              proxy_auth = aiohttp.BasicAuth('username', 'password')
              async with session.get("http://python.org", proxy=proxy, proxy_auth=proxy_auth) as r:
                  print(r.status)

      也可以直接传递:

      async def main():
          async with aiohttp.ClientSession() as session:
              proxy = "http://username:password@127.0.0.1:1080"
              async with session.get("http://python.org", proxy=proxy) as r:
                  print(r.status)

      异步爬虫示例

      import asyncio
      import aiohttp
      
      from lXML import etree
      from datetime import datetime
      
      headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"}
      
      
      async def get_movie_url():
          req_url = "https://movie.douban.com/chart"
          async with aiohttp.ClientSession() as session:
              async with session.get(url=req_url, headers=headers) as response:
                  result = await response.text()
                  result = etree.HTML(result)
              return result.xpath("//*[@id='content']/div/div[1]/div/div/table/tr/td/a/@href")
      
      
      async def get_movie_content(movie_url):
          async with aiohttp.ClientSession() as session:
              async with session.get(url=movie_url, headers=headers) as response:
                  result = await response.text()
                  result = etree.HTML(result)
              movie = dict()
              name = result.xpath('//*[@id="content"]/h1/span[1]//text()')
              author = result.xpath('//*[@id="info"]/span[1]/span[2]//text()')
              movie["name"] = name
              movie["author"] = author
          return movie
      编程
      
      def run():
          start = datetime.now()
          loop = asyncio.get_event_loop()
          movie_url_list = loop.run_until_complete(get_movie_url())
          tasks = [get_movie_content(url) for url in movie_url_list]
          movies = loop.run_until_complete(asyncio.gather(*tasks))
          print(movies)
          print("异步用时为:{}".format(datetime.now() - start))
      
      
      if __name__ == '__main__':
          run()

      总结

      以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜