开发者

Python中异步HTTP客户端/服务器框架aiohttp的使用全面指南

目录
  • 什么是 aiohttp
  • 核心优势
  • 基础用法 - http://www.devze.comHTTP客户端
    • 安装
    • 基本GET请求
    • POST请求示例
  • 高级用法
    • 并发请求
    • 超时控制
    • 流式处理大响应
  • 服务器端开发
    • 基本HTTP服务器
    • REST API示例
    • WebSocket服务器
  • 进阶扩展
    • 中间件示例
    • HTTP/2客户端支持
    • 性能优化配置
  • 最佳实践
    • 完整示例
      • 总结

        什么是 aiohttp

        aiohttp 是一个基于 python asyncio 的异步 HTTP 客户端/服务器框架,专为高性能网络编程设计。它提供了:

        • 异步 HTTP 客户端(类似异步版 requests)
        • 异步 HTTP 服务器(类似异步版 Flask/Django)
        • 完整的 WebSocket 支持
        • 高效的连接池管理

        核心优势

        特性描述
        异步非阻塞单线程处理数千并发连接
        高性能远超同步框架(如 requests)的吞吐量
        轻量级简洁的API,无复杂依赖
        全面协议支持HTTP/1.1, HTTP/2(客户端), WebSocket
        生态完善良好文档和活跃社区

        基础用法 - HTTP客户端

        安装

        pip install aiohttp
        

        基本GET请求

        import aiohttp
        import asyncio
        
        async def main():
            async with aiohttp.ClientSession() as session:
                async with session.get('https://api.example.com/data') as response:
                    print("状态码:", response.status)
                    print("响应内容:", await response.text())
        
        asyncio.run(main())
        

        POST请求示例

        async def post_example():
            async with aiohttp.ClientSession() as session:
                # 表单数据
                async with session.post(
                    'https://httpbin.org/post', 
                    data={'key': 'value'}
                ) as response:
                    print(await response.json())
                
                # JSON数据
                async with session.post(
                    'https://api.example.com/users',
                    json={'name': 'Alice', 'age': 30}
                ) as response:
                    print(await response.json())
        

        高级用法

        并发请求

        async def fetch(url):
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    return await response.text()
        
        async def concurrent_requests():
            urls = [
                'https://api.example.com/item/1',
                'https://api.example.com/item/2',
                'https://api.example.com/item/3'
            ]
            
            tasks = [fetch(url) for url in urls]
            results = await asyncio.gather(*tasks)
            
            for url, content in zip(urls, results):
                print(f"{url}: {content[:50]}...")
        
        asyncio.run(concurrent_requests())
        

        超时控制

        python
        async def timeout_example():
            timeout = aiohttp.ClientTimeout(total=5)  # 5秒总超时
            
            async with aiohttp.ClientSession(timeout=timeout) as session:
                try:
                    async with session.get('https://slow-api.example.com') as response:
                        return await response.text()
                except asyncio.TimeoutError:
                    print("请求超时!")
        

        流式处理大响应

        async def stream_response():
            async with aiohttp.ClientSession() as session:
                async with session.get('https://large-file.example.com') as response:
                    with open('large_file.txt', 'wb') as f:
                        async for chunk in response.content.iter_chunked(1024):
                            f.write(chunk)
                            print(f"已接收 {len(chunk)} 字节")
        

        服务器端开发

        基本HTTP服务器

        from aiohttp import web
        
        async def handle(request):
            name = request.match_info.get('name', "World")
            return web.Response(text=f"Hello, {name}!")
        
        app = web.Application()
        app.add_routes([
            web.get('/', handle),
            web.get('/{name}', handle)
        ])
        
        if __name__ == '__main__':
            web.run_app(app, port=8080)
        

        REST API示例

        async def get_users(request):
            users = [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]
            return web.json_response(users)
        
        async def create_user(request):
            data = await request.json()
            # 实际应用中这里会保存到数据库
            return web.json_response({'id': 3, **data}, status=201)
        
        app = web.Application()
        app.add_routes([
            web.get('/api/users', get_users),
            web.post('/api/users', create_user)
        ])
        

        WebSocket服务器

        async def websocket_handler(request):
            ws = web.WebSocketResponse()
            await ws.prepare(request)
            
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    if msg.data == 'close':
                        await ws.close()
                    else:
                        await ws.send_str(f"ECHO: {msg.data}")
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print('WebSocket连接异常关闭')
            
            return ws
        
        ​​​​​​​app.add_routes([web.get('/ws', websocket_handler)])

        进阶扩展

        中间件示例

        async def auth_middleware(app, handler):
            async def middleware(request):
                # 验证API密钥
                if request.headers.get('X-API-Key') != 'SECRET_KEY':
                    return web.json_response({'error': 'Unauthoriz编程客栈ed'}, status=401)
                return await handler(request)
            return middleware
        
        app = web.Application(middlewares=[auth_middleware])
        

        HTTP/2客户端支持

        async def http2_request():
            conn = aiohttp.TCPConnector(force_close=True, enable_cleanup_closed=True)
            async with aiohttp.ClientSession(connector=conn) as session:
                async with session编程.get(
                    'https://http2.akamai.com/',
                    headers={'accept': 'text/html'}
                ) as response:
                    print("HTTP版本:", response.version)
                    print("内容:", await response.text()[:200])
        

        性能优化配置

        # 自定义连接器配置
        connector = aiohttp.编程客栈TCPConnector(
            limit=100,  # 最大并发连接数
            limit_per_host=20,  # 单主机最大连接数
            ssl=False,  # 禁用SSL验证(仅用于测试)
            force_close=True  # 避免连接延迟关闭
        )
        
        # 自定义会话配置
        session = aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(total=30),
            headers={'User-Agent': 'MyApp/1.0'},
            cookie_jar=aiohttp.CookieJar(unsafe=True)
        )
        

        最佳实践

        重用ClientSession:避免为每个请求创建新会话

        使用连接池:合理配置TCPConnector参数

        超时设置:总是配置合理的超时时间

        资源清理:使用async with确保资源释放

        错误处理:捕获并处理常见网络异常

        try:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
        except aiohttp.ClientError as e:
            print(f"请求错误: {e}")
        

        完整示例

        import aiohttp
        import asyncio
        from aiohttp import web
        
        # 客户端示例
        async def fetch_data():
            async with aiohttp.ClientSession() as session:
                # 并发请求多个API
                urls = [
                    'https://jsonplaceholder.typicode.com/posts/1',
                    'https://jsonplaceholder.typicode.com/comments/1',
                    'https://jsonplaceholder.typicode.com/albums/1'
                ]
                
                tasks = []
                for url in urls:
                    tasks.append(session.get(url))
                
                responses = await asyncio.gather(*tasks)
                
                results = []
                for response in responses:
                    results.append(await response.json())
                
                return results
        
        # 服务器示例
        async def handle_index(request):
            return web.Response(text="Welcome to aiohttp server!")
        
        async def handle_api(request):
            data = await fetch_data()
            return web.json_response(data)
        
        # 创建应用
        app = web.Application()
        app.add_routes([
            web.get('/', handle_index),
            web.get('/api', handle_api)
        ])
        
        # 启动服务器
        async def start_server():
            runner = web.AppRunner(app)
            await runner.setup()
            site = web.TCPSite(runner, 'localhost', 8080)
            await site.start()
            print("Server running at http://localhost:8080")
            
            # 保持运行
            while True:
                await asyncio.sleep(3600)  # 每小时唤醒一次
        
        if __name__ == '__main__':
            asyncio.run(start_server())
        

        总结

        aiohttp 是 Python 异步生态中处理 HTTP 通信的首选工具,它提供了:

        • 高效客户端:用于高性能爬虫、API调用
        • 轻量级服务器:构建高性能Web服务和API
        • WebSocket支持:实现实时双向通信
        • 连接池管理:优化资源利用率

        通过合理利用 aiohttp 的异步特性,开发者可以轻松构建出能够处理数万并发连接的高性能网络应用,同时保持代码的简洁性和可维护性。

        到此这篇关于Python中异步HTTP客户端/服务器框架aiohttp的使用全面指南的文章就介绍到这了,更多相关Python异步框架aiohttp内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

        0

        上一篇:

        下一篇:

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        最新开发

        开发排行榜