开发者

基于Python构建一个智能对话系统

目录
  • 系统概述
  • 环境准备与依赖安装
  • 项目结构设计
  • 核心代码解析
    • 1. 数据模型设计
    • 2. 数据库管理
    • 3. 核心功能工具
    • 4. 多智能体系统
    • 5. 主协调函数
  • 系统特色与优势
    • 1. 模块化设计
    • 2. 智能话题识别
    • 3. 灵活的数据持久化
    • 4. 多智能体协作
  • 实际应用示例
    • 扩展方向
      • 完整代码
        • 并行
          • 总结

            在当今信息爆炸的时代,如何有效管理和组织对话记录成为了一个重要挑战。本文将介绍如何使用python构建一个智能对话系统,该系统能够自动识别对话话题、生成摘要,并提供智能对话管理功能。

            系统概述

            这个智能对话系统的核心功能包括:

            • 对话记录管理:使用SQLite数据库持久化存储聊天记录
            • 话题自动识别:将相关对话内容聚类成话题
            • 智能摘要生成:对每个话题生成简洁的摘要
            • 多智能体协作:使用多个AI智能体分工处理不同任务

            下面我们来详细解析系统的各个组成部分。

            环境准备与依赖安装

            首先,确保你已安装Python 3.7+,然后安装必要的依赖包:

            pip install langchain langchain-community langchain-openai
            

            如果你的OpenAI模型是通过LM Studio本地部署的,还需要配置相应的API端点。

            项目结构设计

            合理的项目结构是代码可维护性的基础,我们的项目结构如下:

            chat_system/

            ├── chat_manager.py     # 主程序文件

            ├── requirements.txt    # 依赖列表

            └── chat_topics.db     # SQLite数据库(自动生成)

            核心代码解析

            1. 数据模型设计

            我们首先定义两个核心数据模型:话题(Topic)和聊天消息(ChatMessage)。

            class Topic:
                def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
                    self.topic_id = topic_id
                    self.topic = topic
                    self.summary = summary
                    self.message_ids = message_ids
            
            class ChatMessage:
                def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
                    self.message_id = message_id
                    self.role = role
                    self.content = content
                    self.timestamp = timestamp or datetime.now()
            

            2. 数据库管理

            我们使用SQLite数据库进行数据持久化存储,通过DatabaseManager类统一管理数据库操作。

            class DatabaseManager:
                def __init__(self, db_path: str = "chat_topics.db"):
                    self.db_path = db_path
                    self.init_db()
                
                def init_db(self):
                    """初始化数据库表"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    
                    # 创建topics表
                    cursor.execute("""
                        CREATE TABLE IF NOT EXISTS topics (
                            topic_id TEXT PRIMARY KEY,
                            topic TEXT NOT NULL,
                            summary TEXT,
                            message_ids TEXT
                        )
                    """)
                    
                    # 创建chat_messages表
                    cursor.execute("""
                        CREATE TABLE IF NOT EXISTS chat_messages (
                            message_id TEXT PRIMARY KEY,
                            session_id TEXT NOT NULL,
                            role TEXT NOT NULL,
                            content TEXT NOT NULL,
                            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                        )
                    """)
                    
                    conn.commit()
                    conn.close()
            

            3. 核心功能工具

            系统提供了三个核心工具函数,分别负责话题选择、历史记录获取和话题压缩。

            def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
                """根据topic_ids选择话题的工具"""
                # 实现细节...
            
            def get_history(session_id: str) -> List[Dict[str, Any]]:
                """获取当前会话的聊天记录"""
                # 实现细节...
            
            def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
                """将聊天记录压缩成话题"""
                # 实现细节...
            

            4. 多智能体系统

            系统采用多智能体架构,每个智能体负责不同的任务:

            • 历史信息检索智能体:负责检索相关的历史话题
            • 问答智能体:基于上下文信息回答用户问题
            • 话题总结智能体:将对话记录压缩成结构化话题
            # 创建智能体执行器
            history_retrieval_agent = AgentExecutor(
                agent=history_retrieval_agent,
                tools=tools,
                verbose=True,
                handle_parsing_errors=True
            )
            
            # 类似创建其他智能体...
            

            5. 主协调函数

            super_agent函数作为系统的主入口,协调各个智能体协作处理用户输入。

            def super_agent(user_input: str, session_id: str):
                """超级智能体主函数"""
                if not session_id:
                    session_id = str(uuid.uuid4())
                
                # 保存用户输入
                user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
                db_manager.save_chat_message(session_id, user_message)
                
                # 1. 历史信息检索智能体
                print("=== 历史信息检索智能体 ===")
                # ... 检索相关话题
                
                # 2. 使用检测信息回答用户问题智能体
                print("=== 问答智能体 ===")
                # ... 生成回答
                
                # 3. 处理历史信息 话题总结智能体
                print("=== 话题总结智能体 ===")
                # ... 压缩对话为话题
                
                return {
                    "session_id": session_id,
                    "qa_response": qa_response,
                    "new_topics": new_topic
                }
            

            系统特色与优势

            1. 模块化设计

            系统采用高度模块化的设计,各个组件职责分明,便于维护和扩展。

            2. 智能话题识别

            系统能够自动识别对话中的主题,并将相关对话内容聚类,大大提高了对话记录的可管理性。

            3. 灵活的数据持久化

            使用SQLite数据库,既保证了数据的持久化,又避免了复杂数据库的部署成本。

            4. 多智能体协作

            通过多个专门化的智能体分工合作,提高了系统的整体性能和准确性。

            实际应用示例

            下面是一个简单的使用示例:

            if __name__ == "__main__":
                result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
                print(f"最终结果: {result}")
            

            系统会自动处理用户输入,检索相关历史话题,生成回答,并将当前对话压缩成新话题。

            扩展方向

            这个基础系统有很多可能的扩展方向:

            • 用户认证系统:添加用户登录和权限管理
            • 话题可视化:提供图形化界面展示话题关系
            • 高级摘要算法:使用更先进的NLP技术生成质量更高的摘要
            • 多语言支持:扩展系统以支持多种语言
            • 实时协作:支持多用户同时使用和协作

            完整代码

            import os
            import uuid
            import sqlite3
            import json
            from datetime import datetime
            from typing import List, Dict, Any, Union
            
            from langchain.agents import AgentExecutor, create_react_agent
            from langchain.prompts import PromptTemplate
            from langchain.tools import Tool
            
            # 为避免弃用警告,使用条件导入
            try:
                from langchain_openai import ChatOpenAI
            except ImportError:
                from langchain_community.chat_models import ChatOpenAI
            
            
            # 配置连接到LM Studio的LLM
            llm = ChatOpenAI(
                model="qwen/qwen3-1.7b",
                openai_api_key="lm-studio",
                openai_ajspi_base="http://127.0.0.1:1234/v1",
                temperature=0.7
            )
            
            
            class Topic:
                def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
                    self.topic_id = topic_id
                    self.topic = topic
                    self.summary = summary
                    self.message_ids = message_ids
            
            
            class ChatMessage:
                def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
                    self.message_id = message_id
                    self.role = role
                    self.content = content
                    self.timestamp = timestamp or datetime.now()
            
            
            class DatabaseManager:
                def __init__(self, db_path: str = "chat_topics.db"):
                    self.db_path = db_path
                    self.init_db()
            
                def init_db(self):
                    """初始化数据库表"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    
                    # 创建topics表
                    cursor.execute("""
                        CREATE TABLE IF NOT EXISTS topics (
                            topic_id TEXT PRIMARY KEY,
                            topic TEXT NOT NULL,
                            summary TEXT,
                            message_ids TEXT
                        )
                    """)
                    
                    # 创建chat_messages表
                    cursor.execute("""
                        CREATE TABLE IF NOT EXISTS chat_messages (
                            message_id TEXT PRIMARY KEY,
                            session_id TEXT NOT NULL,
                            role TEXT NOT NULL,
                            content TEXT NOT NULL,
                            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                        )
                    """)
                    
                    conn.commit()
                    conn.close()
            
                def save_topic(self, topic: Topic):
                    """保存话题"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    cursor.execute(
                        "INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",
                        (topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids))
                    )
                    conn.commit()
                    conn.close()
            
                def get_all_topics(self) -> List[Topic]:
                    """获取所有话题"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")
                    rows = cursor.fetchall()
                    conn.close()
                    
                    topics = []
                    for row in rows:
                        topic_id, topic_str, summary, message_ids_str = row
                        message_ids = json.loads(message_ids_str) if message_ids_str else []
                        topics.append(Topic(topic_id, topic_str, summary, message_ids))
                    return topics
            
                def save_chat_message(self, session_id: str, message: ChatMessage):
                    """保存聊天消息"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    # 使用INSERT OR IGNORE避免重复插入
                    cursor.execute(
                        "INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",
                        (message.message_id, session_id, message.role, message.content)
                    )
                    conn.commit()
                    conn.close()
            
                def get_session_messages(self, session_id: str) -> List[ChatMessage]:
                    """获取会话消息"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    cursor.execute(
                        "SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",
                        (session_id,)
                    )
                    rows = cursor.fetchall()
                    conn.close()
                    
                    messages = []
                    for row in rows:
                        message_id, role, content, timestamp = row
                        # 处理时间戳格式
                        if isinstance(timestamp, str):
                            try:
                                timestamp = datetime.fromisoformat(timestamp)
                            except ValueError:
                                timestamp = datetime.now()
                        messages.append(ChatMessage(message_id, role, content, timestamp))
                    return messages
            
            
            # 创建数据库管理器实例
            db_manager = DatabaseManager()
            
            
            def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
                """根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""
                topics = db_manager.get_all_topics()
                
                # 如果topic_ids为空,处理所有话题
                if not topic_ids:
                    selected_topics = topics
                    # 如果summary_only是布尔值,则应用于所有话题
                    if isinstance(summary_only, bool):
                        summary_flags = [summary_only] * len(selected_topics)
                    # 如果summary_only是列表,则按顺序应用
                    elif isinstance(summary_only, list):
                        summary_flags = summary_only
                    else:
                        summary_flags = [False] * len(selected_topics)
                else:
                    # 根据指定的topic_ids筛选话题
                    selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]
                    # 如果summary_only是布尔值,则应用于所有选定话题
                    if isinstance(summary_only, bool):
                        summary_flags = [summary_only] * len(selected_topics)
                    # 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)
                    elif isinstance(summary_only, list):
                        summary_flags = summary_only[:len(selected_topics)]
                        # 如果summary_only列表长度不足,用False填充
                        summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))
                    else:
                        summary_flags = [False] * len(selected_topics)
                
                result = []
                for i, topic in enumerate(selected_topics):
                    # 获取对应的summary_only标志
                    show_summary_only = summary_flags[i] if i < len(summary_flags) else False
                    
                    if show_summary_only:
                        result.append({
                            "topic_id": topic.topic_id,
                            "topic": topic.topic,
                            "summary": topic.summary
                        })
                    else:
                        result.append({
                            "topic_id": topic.topic_id,
                            "topic": topic.topic,
                            "summary": topic.summary,
                            "message_ids": topic.message_ids
                        })
                return result
            
            
            def get_history(session_id: str) -> List[Dict[str, Any]]:
                """获取当前会话的聊天记录"""
                messages = db_manager.get_sesandroidsion_messages(session_id)
                return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]
            
            
            def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
                """将新的聊天记录压缩成话题"""
                # 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法
                if not new_messages:
                    return []
                
                # 创建新话题
                topic_id = str(uuid.uuid4())
                topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']
                
                message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]
                
                # 保存消息到数据库
                for msg in new_messages:
                    if 'message_id' not in msg:
                        msg['message_id'] = str(uuid.uuid4())
                    chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])
                    db_manager.save_chat_message(session_id, chat_msg)
                
                # 创建话题对象
                topic = Topic(
                    topic_id=topic_id,
                    topic=topic_content,
                    summary=f"包含{len(new_messages)}条消息的话题",
                    message_ids=message_ids
                )
                
                # 保存话题到数据库
                db_manager.save_topic(topic)
                
                # 返回话题信息
                return [{
                    "topic_id": topic.topic_id,
                    "topic": topic.topic,
                    "summary": topic.summary,
                    "message_ids": topic.message_ids
                }]
            
            
            # 定义工具列表
            tools = [
                Tool.from_function(
                    func=get_history,
                    name="get_history",
                    description="获取指定会话的聊天历史记录"
                ),
                Tool.from_function(
                    func=select_topic_tool,
                    name="select_topic_tool",
                    description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"
                ),
                Tool.from_function(
                    func=compress_history_to_topic,
                    name="compress_history_to_topic",
                    description="将聊天记录压缩成话题"
                )
            ]
            
            # 创建提示模板
            template = """Answer the following questions as best you can. You have Access to the following tools:
            
            {tools}
            
            Use the following format:
            
            Question: the input question you must answer
            Thought: you should always think about what to do
            Action: the action to take, should be one of [{tool_names}]
            Action Input: the input to the action
            Observation: the result of the action
            ... (this Thought/Action/Action Input/Observation can repeat N times)
            Thought: I now know the final answer
            Final Answer: the final answer to the original input question
            
            Begin!
            
            Question: {input}
            Thought:{agent_scratchpad}"""
            
            prompt = PromptTemplate.from_template(template)
            
            # 创建智能体
            history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
            qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
            topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
            
            # 创建智能体执行器
            history_retrieval_agent = AgentExecutor(
                agent=history_retrieval_agent,
                tools=tools,
                verbose=True,
                handle_parsing_errors=True
            )
            
            qa_agent = AgentExecutor(
                agent=qa_agent,
                tools=tools,
                verbose=True,
                handle_parsing_errors=True
            )
            
            topic_summary_agent = AgentExecutor(
                agent=topic_summary_agent,
                tools=tools,
                verbose=True,
                handle_parsing_errors=True
            )
            
            
            def super_agent(user_input: str, session_id: str):
                """超级智能体主函数"""
                if not session_id:
                    session_id = str(uuid.uuid4())
                
                # 保存用户输入
                user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
                db_manager.save_chat_message(session_id, user_message)
                
                # 1. 历史信息检索智能体
                print("=== 历史信息检索智能体 ===")
                # 检索相关话题
                all_topics = db_manager.get_all_topics()
                topic_ids = [topic.topic_id for topic in all_topics]
                
                if topic_ids:
                    # 为每个topic_id指定不同的summary_only值
                    summary_flags = [True, False] * (len(topic_ids) // 2 + 1)
                    summary_flags = summary_flags[:len(topic_ids)]
                    related_topics = select_topic_tool(topic_ids, summary_flags)
                    print(f"相关话题: {related_topics}")
                else:
                    print("未找到相关话题")
                    related_topics = []
                
                # 2. 使用检测信息回答用户问题智能体
                print("=== 问答智能体 ===")
                history = get_history(session_id)
                print(f"会话历史: {history}")
                
                # 构造问答上下文
                qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"
                try:
                    qa_response = qa_agent.invoke({
                        "input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}",
                        "tools": tools,
                        "tool_names": ", ".join([t.name for t in tools])
                    })
                    print(f"问答结果: {qa_response}")
                except Exception as e:
                    print(f"问答智能体执行出错: {e}")
                    qa_response = {"output": "抱歉,我无法生成回答。"}
                
                # 3. 处理历史信息 话题总结智能体
                print("=== 话题总结智能体 ===")
                # 将当前会话消息压缩成话题
                session_messages = get_history(session_id)
                if session_messages:
                    new_topic = compress_history_to_topic(session_id, session_messages)
                    print(f"新话题: {new_topic}")
                else:
                    print("没有新消息需要压缩成话题")
                    new_topic = []
                
                return {
                    "session_id": session_id,
                    "qa_response": qa_response.get("output", qa_response.get("result", "无回答")),
                    "new_topics": new_topic
                }
            
            
            # 示例使用
            if __name__ == "__main__":
                result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
                print(f"最终结果: {result}")
            

            并行

            import os
            import uuid
            import sqlite3
            import json
            from datetime import datetime
            from typing import List, Dict, Any, Union
            from concurrent.futures import ThreadPoolExecutor, as_completed
            
            from langchain.agents import AgentExecutor, create_react_agent
            from langchain.prompts import PromptTemplate
            from langchain.tools import Tool
            
            # 为避免弃http://www.devze.com用警告,使用条件导入
            try:
                from langchain_openai import ChatOpenAI
            except ImportError:
                from langchain_community.chat_models import ChatOpenAI
            
            
            # 配置连接到LM Studio的LLM
            llm = ChatOpenAI(
                model="qwen/qwen3-1.7b",
                openai_api_key="lm-studio",
                openai_api_base="http://127.0.0.1:1234/v1",
                temperature=0.7
            )
            
            
            class Topic:
                def __init__(self, topic_id: str, topic: str, summary: str, message_ids: List[str]):
                    self.topic_id = topic_id
                    self.topic = topic
                    self.summary = summary
                    self.message_ids = message_ids
            
            
            class ChatMessage:
                def __init__(self, message_id: str, role: str, content: str, timestamp: datetime = None):
                    self.message_id = message_id
                    self.role = role
                    self.content = content
                    self.timestamp = timestamp or datetime.now()
            
            
            class DatabaseManager:
                def __init__(self, db_path: str = "chat_topics.db"):
                    self.db_path = db_path
                    self.init_db()
            
                def init_db(self):
                    """初始化数据库表"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    
                    # 创建topics表
                    cursor.execute("""
                        CREATE TABLE IF NOT EXISTS topics (
                            topic_id TEXT PRIMARY KEY,
                            topic TEXT NOT NULL,
                            summary TEXT,
                            message_ids TEXT
                        )
                    """)
                    
                    # 创建chat_messages表
                    cursor.execute("""
                        CREATE TABLE IF NOT EXISTS chat_messages (
                            message_id TEXT PRIMARY KEY,
                            session_id TEXT NOT NULL,
                            role TEXT NOT NULL,
                            content TEXT NOT NULL,
                            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                        )
                    """)
                    
                    conn.commit()
                    conn.close()
            
                def save_topic(self, topic: Topic):
                    """保存话题"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    cursor.execute(
                        "INSERT OR REPLACE INTO topics (topic_id, topic, summary, message_ids) VALUES (?, ?, ?, ?)",
                        (topic.topic_id, topic.topic, topic.summary, json.dumps(topic.message_ids))
                    )
                    conn.commit()
                    conn.close()
            
                def get_all_topics(self) -> List[Topic]:
                    """获取所有话题"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    cursor.execute("SELECT topic_id, topic, summary, message_ids FROM topics")
                    rows = cursor.fetchall()
                    conn.close()
                    
                    topics = []
                    for row in rows:
                        topic_id, topic_str, summary, message_ids_str = row
                        message_ids = json.loads(message_ids_str) if message_ids_str else []
                        topics.append(Topic(topic_id, topic_str, summary, message_ids))
                    return topics
            
                def save_chat_message(self, session_id: str, message: ChatMessage):
                    """保存聊天消息"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    # 使用INSERT OR IGNORE避免重复插入
                    cursor.execute(
                        "INSERT OR IGNORE INTO chat_messages (message_id, session_id, role, content) VALUES (?, ?, ?, ?)",
                        (message.message_id, session_id, message.role, message.content)
                    )
                    conn.commit()
                    conn.close()
            
                def get_session_messages(self, session_id: str) -> List[ChatMessage]:
                    """获取会话消息"""
                    conn = sqlite3.connect(self.db_path)
                    cursor = conn.cursor()
                    cursor.execute(
                        "SELECT message_id, role, content, timestamp FROM chat_messages WHERE session_id = ? ORDER BY timestamp",
                        (session_id,)
                    )
                    rows = cursor.fetchall()
                    conn.close()
                    
                    messages = []
                    for row in rows:
                        message_id, role, content, timestamp = row
                        # 处理时间戳格式
                        if isinstance(timestamp, str):
                            try:
                                timestamp = datetime.fromisoformat(timestamp)
                            except ValueError:
                                timestamp = datetime.now()
                        messages.append(ChatMessage(message_id, role, content, timestamp))
                    return messages
            
            
            # 创建数据库管理器实例
            db_manager = DatabaseManager()
            
            
            def select_topic_tool(topic_ids: List[str], summary_only: Union[List[bool], bool] = False) -> List[Dict[str, Any]]:
                """根据topic_ids选择话题的工具,支持为每个topic_id指定summary_only参数"""
                try:
                    topics = db_manager.get_all_topics()
                    
                    # 如果topic_ids为空,处理所有话题
                    if not topic_ids:
                        selected_topics = topics
                        # 如果summary_only是布尔值,则应用于所有话题
                        if isinstance(summary_only, bool):
                            summary_flags = [summary_only] * len(selected_topics)
                        # 如果summary_only是列表,则按顺序应用
                        elif isinstance(summary_only, list):
                            summary_flags = summary_only
                        else:
                            summary_flags = [False] * len(selected_topics)
                    else:
                        # 根据指定的topic_ids筛选话题
                        selected_topics = [topic for topic in topics if topic.topic_id in topic_ids]
                        # 如果summary_only是布尔值,则应用于所有选定话题
                        if isinstance(summary_only, bool):
                            summary_flags = [summary_only] * len(selected_topics)
                        # 如果summary_only是列表,则按顺序应用(如果长度不够则用False填充)
                        elif isinstance(summary_only, list):
                            summary_flags = summary_only[:len(selected_topics)]
                            # 如果summary_only列表长度不足,用False填充
                            summary_flags.extend([False] * (len(selected_topics) - len(summary_flags)))
                        else:
                            summary_flags = [False] * len(selected_topics)
                    
                    result = []
                    for i, topic in enumerate(selected_topics):
                        # 获取对应的summary_only标志
                        show_summary_only = summary_flags[i] if i < len(summary_flags) else False
                        
                        if show_summary_only:
                            result.append({
                                "topic_id": http://www.devze.comtopic.topic_id,
                                "topic": topic.topic,
                                "summary": topic.summary
                            })
                        else:
                            result.append({
                                "topic_id": topic.topic_id,
                                "topic": topic.topic,
                                "summary": topic.summary,
                                "message_ids": topic.message_ids
                            })
                    return result
                except Exception as e:
                    print(f"select_topic_tool执行出错: {e}")
                    return []
            
            
            def get_history(session_id: str) -> List[Dict[str, Any]]:
                """获取当前会话的聊天记录"""
                try:
                    messages = db_manager.get_session_messages(session_id)
                    return [{"role": msg.role, "content": msg.content, "message_id": msg.message_id} for msg in messages]
                except Exception as XDdtwrie:
                    print(f"get_history执行出错: {e}")
                    return []
            
            
            def compress_history_to_topic(session_id: str, new_messages: List[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
                """将新的聊天记录压缩成话题,并合并相似话题"""
                try:
                    # 如果没有提供new_messages,则从数据库获取会话消息
                    if new_messages is None:
                        new_messages = get_history(session_id)
                    
                    # 这里应该调用LLM来生成话题摘要,为了简化,我们使用简单的方法
                    if not new_messages:
                        return []
                    
                    # 创建新话题
                    topic_id = str(uuid.uuid4())
                    topic_content = new_messages[0]['content'][:20] + "..." if len(new_messages[0]['content']) > 20 else new_messages[0]['content']
                    
                    message_ids = [msg.get('message_id', str(uuid.uuid4())) for msg in new_messages]
                    
                    # 保存消息到数据库
                    for msg in new_messages:
                        if 'message_id' not in msg:
                            msg['message_id'] = str(uuid.uuid4())
                        chat_msg = ChatMessage(msg['message_id'], msg['role'], msg['content'])
                        db_manager.save_chat_message(session_id, chat_msg)
                    
                    # 创建话题对象
                    new_topic = Topic(
                        topic_id=topic_id,
                        topic=topic_content,
                        summary=f"包含{len(new_messages)}条消息的话题",
                        message_ids=message_ids
                    )
                    
                    # 检查是否已存在相似话题
                    all_topics = db_manager.get_all_topics()
                    merged = False
                    for existing_topic in all_topics:
                        # 简单的相似性检查:比较话题标题的前几个字符
                        if existing_topic.topic[:10] == topic_content[:10] and existing_topic.topic_id != topic_id:
                            # 合并到现有话题
                            existing_topic.message_ids.extend(message_ids)
                            existing_topic.summary = f"包含{len(existing_topic.message_ids)}条消息的话题"
                            db_manager.save_topic(existing_topic)
                            merged = True
                            break
                    
                    # 如果没有合并,则保存新话题
                    if not merged:
                        db_manager.save_topic(new_topic)
                    
                    # 返回话题信息
                    return [{
                        "topic_id": new_topic.topic_id,
                        "topic": new_topic.topic,
                        "summary": new_topic.summary,
                        "message_ids": new_topic.message_ids
                    }]
                except Exception as e:
                    print(f"compress_history_to_topic执行出错: {e}")
                    return []
            
            
            # 定义工具列表
            tools = [
                Tool.from_function(
                    func=get_history,
                    name="get_history",
                    description="获取指定会话的聊天历史记录"
                ),
                Tool.from_function(
                    func=select_topic_tool,
                    name="select_topic_tool",
                    description="根据topic_ids选择话题,支持为每个topic指定是否只显示摘要"
                ),
                Tool.from_function(
                    func=lambda session_id: compress_history_to_topic(session_id),
                    name="compress_history_to_topic",
                    description="将聊天记录压缩成话题"
                )
            ]
            
            # 创建提示模板
            template = """Answer the following questions as best you can. You have access to the following tools:
            
            {tools}
            
            Use the following format:
            
            Question: the input question you must answer
            Thought: you should always think about what to do
            Action: the action to take, should be one of [{tool_names}]
            Action Input: the input to the action
            Observation: the result of the action
            ... (this Thought/Action/Action Input/Observation can repeat N times)
            Thought: I now know the final answer
            Final Answer: the final answer to the original input question
            
            Begin!
            
            Question: {input}
            Thought: {agent_scratchpad}"""
            
            prompt = PromptTemplate.from_template(template)
            
            # 创建智能体
            history_retrieval_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
            qa_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
            topic_summary_agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
            
            # 创建智能体执行器
            history_retrieval_agent = AgentExecutor(
                agent=history_retrieval_agent,
                tools=tools,
                verbose=True,
                handle_parsing_errors=True
            )
            
            qa_agent = AgentExecutor(
                agent=qa_agent,
                tools=tools,
                verbose=True,
                handle_parsing_errors=True
            )
            
            topic_summary_agent = AgentExecutor(
                agent=topic_summary_agent,
                tools=tools,
                verbose=True,
                handle_parsing_errors=True
            )
            
            
            def super_agent(user_input: str, session_id: str):
                """超级智能体主函数"""
                if not session_id:
                    session_id = str(uuid.uuid4())
                
                # 保存用户输入
                user_message = ChatMessage(str(uuid.uuid4()), "user", user_input)
                db_manager.save_chat_message(session_id, user_message)
                
                # 1. 历史信息检索智能体(并行搜索)
                print("=== 历史信息检索智能体(并行搜索) ===")
                # 检索相关话题
                all_topics = db_manager.get_all_topics()
                topic_ids = [topic.topic_id for topic in all_topics]
                
                related_topics = []
                if topic_ids:
                    # 将话题ID分组,每组10个
                    topic_groups = [topic_ids[i:i+10] for i in range(0, len(topic_ids), 10)]
                    
                    # 并行执行多个搜索任务,每组话题作为一个任务
                    with ThreadPoolExecutor(max_workers=min(5, len(topic_groups))) as executor:
                        # 提交多个任务
                        future_to_topic = {}
                        
                        # 为每组话题创建一个任务
                        for i, group in enumerate(topic_groups):
                            # 交替使用摘要和完整信息模式
                            summary_mode = (i % 2 == 0)  # 偶数索引组使用摘要模式
                            future = executor.submit(select_topic_tool, group, summary_mode)
                            future_to_topic[future] = f"话题组{i}搜索({'摘要' if summary_mode else '完整'})"
                        
                        # 任务: 获取会话历史
                        future3 = executor.submit(get_history, session_id)
                        future_to_topic[future3] = "会话历史"
                        
                        # 收集并行任务结果
                        search_results = {}
                        for future in as_completed(future_to_topic):
                            task_name = future_to_topic[future]
                            try:
                                result = future.result()
                                search_results[task_name] = result
                                print(f"{task_name} 完成,结果数: {len(result) if isinstance(result, list) else 'N/A'}")
                            except Exception as exc:
                                print(f'{task_name} 执行时发生异常: {exc}')
                                search_results[task_name] = []
                        
                        # 合并所有话题搜索结果
                        for task_name, result in search_results.items():
                            if "话题组" in task_name and isinstance(result, list):
                                related_topics.extend(result)
                        
                        print(f"合并后相关话题数: {len(related_topics)}")
                else:
                    print("未找到相关话题")
                    related_topics = []
                
                # 2. 使用检测信息回答用户问题智能体
                print("=== 问答智能体 ===")
                history = get_history(session_id)
                print(f"会话历史: {history}")
                
                # 构造问答上下文
                qa_context = f"用户问题: {user_input}\n相关话题: {related_topics}\n会话历史: {history}"
                try:
                    qa_response = qa_agent.invoke({
                        "input": f"基于以下信息回答用户问题:\n{qa_context}\n\n用户问题: {user_input}"
                    })
                    print(f"问答结果: {qa_response}")
                except Exception as e:
                    print(f"问答智能体执行出错: {e}")
                    qa_response = {"output": "抱歉,我无法生成回答。"}
                
                # 3. 处理历史信息 话题总结智能体
                print("=== 话题总结智能体 ===")
                # 将当前会话消息压缩成话题
                new_topic = compress_history_to_topic(session_id)
                if new_topic:
                    print(f"新话题: {new_topic}")
                else:
                    print("没有新消息需要压缩成话题")
                
                # 确保qa_response不为None并且有合适的键
                if qa_response is None:
                    qa_response = {"output": "抱歉,我无法生成回答。"}
                
                # 处理可能缺少output或result键的情况
                if "output" in qa_response:
                    qa_result = qa_response["output"]
                elif "result" in qa_response:
                    qa_result = qa_response["result"]
                else:
                    qa_result = "无回答"
                
                return {
                    "session_id": session_id,
                    "qa_response": qa_result,
                    "new_topics": new_topic
                }
            
            
            # 示例使用
            if __name__ == "__main__":
                result = super_agent("你好,能告诉我天气怎么样吗?", "session_1")
                print(f"最终结果: {result}")
            

            总结

            本文详细介绍了一个基于Python的智能对话系统的设计与实现。系统利用LangChain框架和SQLite数据库,实现了对话记录管理、话题识别和摘要生成等核心功能。通过多智能体协作架构,系统能够高效地处理用户查询并管理对话历史。

            这种系统可以广泛应用于客服系统、个人知识管理、团队协作等多种场景,帮助用户更好地组织和利用对话信息。希

            提示:在实际部署时,建议添加错误处理、日志记录和性能监控等生产环境需要的功能,以确保系统的稳定性和可维护性。

             以上就是基于Python构建一个智能对话系统的详细内容,更多关于Python智能对话系统的资料请关注编程客栈(www.devze.com)其它相关文章!

            0

            上一篇:

            下一篇:

            精彩评论

            暂无评论...
            验证码 换一张
            取 消

            最新开发

            开发排行榜