开发者

Python基于FastAPI和WebSocket实现实时聊天应用

目录
  • 项目简介
  • 技术栈
  • 项目结构
  • 详细代码实现
    • 1. 后端实现 (main.py)
    • 2. 前端实现 (templates/chat.html)
  • 功能特点
    • 如何运行
      • 使用流程
        • 效果图

          项目简介

          这是一个基于 FastAPI 和 WebSocket 实现的实时聊天应用,支持一对一聊天、离线消息存储等功能。

          技术栈

          后端:FastAPI (python)

          前端:HTML、JavaScript、css

          通信:WebSocket

          认证:简单的 token 认证

          项目结构

          ├── main.py              # 后端主程序

          └── templates/           # 前端模板目录

              └── chat.html       # 聊天页面

          详细代码实现

          1. 后端实现 (main.py)

          from fastapi import FastAPI, WebSocket, Depends, WebSocketDisconnect
          from typing import Dict
          import json
          from fastapi.responses import HTMLResponse, FileResponse
          from fastapi.staticfiles import StaticFiles
          
          app = FastAPI()
          app.mount("/templates", StaticFiles(directory="templates"), name="templates")
          
          class ConnectionManager:
              def __init__(self):
                  self.active_connections: Dict[str, WebSocket] = {}
                  self.offline_messages: Dict[str, list] = {}  # 离线消息存储
          
              async def connect(self, user: str, websocket: WebSocket):
                  await websocket.accept()
                  self.active_connections[user] = websocket
          
              async def disconnect(self, user: str, websocket: WebSocket):
                  if user in self.active_connections:
                      del self.active_connections[user]
          
              async def send_personal_message(self, message: str, to_user: str):
                  if to_user in self.active_connections:
                      await self.active_connections[to_user].send_text(message)
                      return {"success": True}
                  http://www.devze.comelse:
                      # 存储离线消息
                      if to_user not in self.offline_messages:
                          self.offline_messages[to_user] = []
                      self.offline_messages[to_user].append(message)
                      return {"success": False, "reason": "offline", "stored": True}
          
              async def get_offline_messages(self, user: str):
                  if user in self.offline_messages:
                      messages = self.offline_messages[user]
                      del self.offline_messages[user]  # 取出后删除
                      return messages
                  return []
          
          # Token 认证
          async def get_cookie_or_token(token: str = None):
              if not token:
                  from fastapi import HTTPException
                  raise HTTPException(status_code=401, detail="未授权")
              return token
          
          # 初始化连接管理器
          ws_manager = ConnectionManager()
          
          @app.websocket("/ws/{user}")
          async def websocket_one_to_one(
                  websocket: WebSocket,
                  user: str,
                  cookie_or_token: str = Depends(get_cookie_or_token)
          ):
              await ws_manager.connect(user, websocket)
              
              # 发送离线消息
              offline_msgs = await ws_manager.get_offline_messages(user)
              for msg in offline_msgs:
                  await websocket.send_text(msg)
              
              try:
                  while True:
                      data = await websocket.receive_text()
                      message_data = json.loads(data)
                      
                      # 防止自己给自己发消息
                      if message_data["to_user"] == user:
                          error_msg = {
                              "type": "error",
                              "content": "不能发送消息给自己"
                          }
                          await websocket.send_text(json.dumps(error_msg))
                          continue
                          
                      response_message = {
                          "from": user,
                          "content": message_data["content"],
                          "timestamp": message_data.get("timestamp"),
                          "type": "message"
                      }
                      
                      # 发送消息
                      result = await ws_manager.send_personal_message(
                          message=json.dumps(response_message),
                          to_user=message_data["to_user"]
                      )
                      
                      # 处理离线消息
                      if not result["success"]:
                          if result.get("stored"):
                              success_msg = {
                                  "type": "info",
                                  "content": f"消息已保存,将在用户 {message_data['to_user']} 上线时送达"
                              }
                              await websocket.send_text(json.dumps(success_msg))
              except WebSocketDisconnect:
                  await ws_manager.disconnect(user, websocket)
              except Exception as e:
                  print(f"WebSocket 错误: {e}")
                  await ws_manager.disconnect(user, websocket)
          
          @app.get("/", response_class=FileResponse)
          async def root():
              return "templates/chat.html"
          

          2. 前端实现 (templates/chat.html)

          <!DOCTYPE html>
          <html>
          <head>
              <title>WebSocket Chat</title>
              <style>
                  body {
                      font-family: Arial, sans-serif;
                      max-width: 800px;
                      margin: 0 auto;
                      padding: 20px;
                  }
                  #loginSection {
                      text-align: center;
                      margin-top: 100px;
                  }
                  #chatSection {
                      display: none;
                  }
                  #messages {
                      list-style-type: none;
                      padding: 0;
                      height: 400px;
                      overflow-y: auto;
                      border: 1px solid #ccc;
                      margin-bottom: 20px;
                      padding: 10px;
                  }
                  .message {
                      margin: 10px 0;
                      padding: 10px;
                      border-radius: 5px;
                      background-color: #f0f0f0;
                  }
                  .input-group {
                      margin-bottom: 10px;
                  }
                  input[type="text"] {
                      padding: 8px;
                      margin-right: 10px;
                      width: 200px;
                  }
                  button {
                      padding: 8px 15px;
                      background-color: #4CAF50;
                      color: white;
                      border: none;
                      border-radius: 4px;
                      cursor: pointer;
                  }
                  button:hover {
                      background-color: #4编程客栈5a049;
                  }
                  #logoutBtn {
                      background-color: #f44336;
                  }
                  #logoutBtn:hover {
                      background-color: #da190b;
                  }
                  .error {
                      编程color: red;
                      margin: 10px 0;
                  }
                  .info {
             WsjDlr         color: blue;
                      margin: 10px 0;
                  }
              </style>
          </head>
          <body>
              <!-- 登录部分 -->
              <div id="loginSection">
                  <h1>WebSocket 聊天</h1>
                  <div class="input-group">
                      <input type="text" id="loginUsername" placeholder="输入用户名" autocomplete="off"/>
                      <button onclick="login()">登录</button>
                  </div>
                  <div id="loginError" class="error"></div>
              </div>
          
              <!-- 聊天部分 -->
              <div id="chatSection">
                  <h1>WebSocket 聊天</h1>
                  <div id="currentUser"></div>
                  <form action="" onsubmit="sendMessage(event)">
                      <div class="input-group">
                          <input type="text" id="messageText" placeholder="输入消息" autocomplete="off"/>
                          <input type="text" id="usernandroidame" placeholder="接收者用户名" autocomplete="off"/>
                          <button type="submit">发送</button>
                      </div>
                  </form>
                  <button id="logoutBtn" onclick="logout()">退出</button>
                  <ul id='messages'></ul>
              </div>
          
              <script>
                  let ws = null;
          
                  function checkLogin() {
                      const token = localStorage.getItem('token');
                      if (token) {
                          document.getElementById('loginSection').style.display = 'none';
                          document.getElementById('chatSection').style.display = 'block';
                          document.getElementById('currentUser').textContent = `当前用户: ${token}`;
                          connectWebSocket();
                      } else {
                          document.getElementById('loginSection').style.display = 'block';
                          document.getElementById('chatSection').style.display = 'none';
                      }
                  }
          
                  function login() {
                      const username = document.getElementById('loginUsername').value.trim();
                      if (!username) {
                          document.getElementById('loginError').textContent = '请输入用户名';
                          return;
                      }
                      localStorage.setItem('token', username);
                      checkLogin();
                  }
          
                  function logout() {
                      if (ws && ws.readyState === WebSocket.OPEN) {
                          ws.close();
                      }
                      localStorage.removeItem('token');
                      document.getElementById('messages').innerHTML = '';
                      document.getElementById('messageText').value = '';
                      document.getElementById('username').value = '';
                      document.getElementById('loginSection').style.display = 'block';
                      document.getElementById('chatSection').style.display = 'none';
                      document.getElementById('loginUsername').value = '';
                      document.getElementById('currentUser').textContent = '';
                  }
          
                  function connectWebSocket() {
                      const token = localStorage.getItem('token');
                      ws = new WebSocket(`ws://localhost:8000/ws/${token}?token=${token}`);
                      
                      ws.onmessage = function(event) {
                          const messages = document.getElementById('messages');
                          const message = document.createElement('li');
                          message.className = 'message';
                          
                          const data = JSON.parse(event.data);
                          
                          if (data.type === 'error') {
                              message.style.color = 'red';
                              message.textContent = data.content;
                          } else if (data.type === 'info') {
                              message.style.color = 'blue';
                              message.textContent = data.content;
                          } else {
                              message.textContent = `${data.from}: ${data.content}`;
                          }
                          
                          messages.appendChild(message);
                          messages.scrollTop = messages.scrollHeight;
                      };
          
                      ws.onerror = function(error) {
                          console.error('WebSocket 错误:', error);
                      };
          
                      ws.onclose = function() {
                          console.log('WebSocket 连接已关闭');
                      };
                  }
          
                  function sendMessage(event) {
                      event.preventDefault();
                      
                      const messageInput = document.getElementById("messageText");
                      const usernameInput = document.getElementById("username");
                      
                      if (!messageInput.value.trim() || !usernameInput.value.trim()) {
                          return;
                      }
                      
                      const messageData = {
                          content: messageInput.value,
                          to_user: usernameInput.value,
                          timestamp: new Date().toISOString()
                      };
          
                      ws.send(JSON.stringify(messageData));
                      
                      const messages = document.getElementById('messages');
                      const message = document.createElement('li');
                      message.className = 'message';
                      message.style.backgroundColor = '#e3f2fd';
                      message.textContent = `我: ${messageInput.value}`;
                      messages.appendChild(message);
                      messages.scrollTop = messages.scrollHeight;
                      
                      messageInput.value = '';
                  }
          
                  checkLogin();
              </script>
          </body>
          </html>
          

          功能特点

          1.用户管理

          • 简单的用户名登录系统
          • 用户在线状态管理
          • 用户会话保持

          2.消息功能

          • 实时一对一聊天
          • 离线消息存储
          • 上线自动接收离线消息
          • 防止自己给自己发消息

          3.界面特性

          • 响应式设计
          • 消息实时显示
          • 错误信息提示
          • 消息状态反馈

          4.技术特性

          • WebSocket 实时通信
          • 状态管理
          • 错误处理
          • 会话管理

          如何运行

          1.安装依赖

          pip install fastapi uvicorn
          

          2.启动服务器

          uvicorn main:app --reload
          

          3.访问应用

          • 打开浏览器访问 http://localhost:8000
          • 输入用户名登录
          • 开始聊天

          使用流程

          1.登录

          • 输入用户名
          • 点击登录按钮

          2.发送消息

          • 输入接收者用户名
          • 输入消息内容
          • 点击发送

          3.接收消息

          • 实时接收其他用户发送的消息
          • 上线时接收离线消息

          4.退出

          • 点击退出按钮
          • 清除登录状态

          效果图

          Python基于FastAPI和WebSocket实现实时聊天应用

          Python基于FastAPI和WebSocket实现实时聊天应用

          以上就是Python基于FastAPI和WebSocket实现实时聊天应用的详细内容,更多关于Python FastAPI WebSocket实时聊天的资料请关注编程客栈(www.devze.com)其它相关文章!

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜