开发者

Python实现常见网络通信的示例详解

目录
  • 一、HTTP/HTTPS 通信
    • 1. 客户端示例(requests 库)
    • 2. 服务端示例(Flask)
  • 二、UDP 通信
    • 1. 服务端
    • 2. 客户端
  • 三、WebSocket 通信
    • 1. 服务端
    • 2. 客户端
  • 四、Server-Sent Events (SSE)
    • 1. 服务端(Flask 实现)
    • 2. 客户端
  • 关键点说明

    一、HTTP/HTTPS 通信

    1. 客户端示例(requests 库)

    import requests
    
    # HTTP GET
    response = requests.get('http://httpbin.org/get')
    print(response.text)
    
    # HTTPS POST
    response = requests.post(
        'https://httpbin.org/post',
        data={'key': 'valuewww.devze.com'},
        verify=True  # 验证 SSL 证书(默认)
    )
    print(response.json())
    

    2. 服务端示例(Flask)

    from flask import Flask, request
    
    app = Flask(__name__)
    
    @app.route('/api', methods=['GET', 'POST'])
    def handle_request():
        if request.method == 'GET':
            return {'message': 'GET received'}
        elif request.method == 'POST':
            return {'data': request.json}
    
    if __name__ == '__main__':
        app.run(ssl_context='adhoc')  # 启用 HTTPS
    

    二、UDP 通信

    1. 服务端

    import socket
    
    server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server.bind(('0.0.0.0', 9999))
    
    while True:
        data, addr = server.recvfrom(1024)
        print(f"Received from {addr}: {data.decode()}")
        server.sendto(b'UDP response', addr)
    

    2. 客户端

    import socket
    
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    client.sendto(b'Hello UDP', ('localhost', 9999))
    response, addr = client.recvfrom(1024)
    print(f"Received: {response.decode()}")
    

    三、WebSocket 通信

    需要安装库:pip install websockets

    1. 服务端

    import asyncio
    import websockets
    
    async def handler(websocket):
        async for message in websocket:
            print(f"Received: {message}")
            await websocket.send(f"Echo: {message}")
    
    async def main():
        async phpwith websockets.serve(handler, "localhost", 8765):
            await asyncio.Future()  # 永久运行
    
    aswww.devze.comyncio.run(main())
    

    2. 客户端

    import asyncio
    import websockets
    
    async def client():
        async with websockets.connect("ws://localhost:8765") as ws:
            await ws.send("Hello WebSocket!")
            response = await ws.recv()
            print(f"Received: {response}")
    
    asyncio.run(client())
    

    四、Server-Sent Events (SSE)

    需要安装库:pip install sseclient-py

    1. 服务端(Flask 实现)

    from flask import Flask, Response
    
    app = Flask(__name__)
    
    @app.route('/stream')
    def stream():
        def event_stream():
            for i in range(5):
                yield f"data: Message {i}\n\n"
        return Response(event_stream(), mimetype="text/event-stream")
    
    if __name__ == '__main__':
        app.run()
    

    2. 客户端

    import requests
    from sseclient import SSEClient
    
    url = 'http://localhost:5000/stream'
    response = requests.get(url, stream=True)
    client = SSEClient(response)
    
    for event in client.events()android:
        print(f"Received event: {event.data}")
    

    关键点说明

    • HTTP/HTTPS:最常用的请求-响应模型
    • UDP:无连接协议,适合实时性要求高的场景
    • WebSocket:全双工通信协议,适合实时双向通信
    • SSE:服务器到客户端的单向推送技术
    • 安全建议
      • HTTPS 使用 requests 的 verify=True 验证证书
      • WebSocket 使用 wss:// 安全协议
      • 生产环境应使用正式 SSL 证书

    根据具体需求选择协议:

    • 简单数据请求:HTTP/HTTPS
    • 实时游戏/视频:UDP
    • 聊天应用:WebSocket
    • 实时通知:SSE

    建议根据实际场景配合使用异步框架(如 aiohttp、FastAPI)以获得更好的性能。

    到此这篇关于python实现常见网络通信的示例详解的文章就介绍到这了,更多相http://www.devze.com关Python网络通信内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜