开发者

PyMySQL数据库连接与优化方式

目录
  • 一、Pymysql 简介
  • 二、数据库连接配置
    • 基础连接方式
    • 完整连接示例
  • 三、数据库基础操作
    • 创建示例数据表
    • 数据库操android作封装类
    • CRUD 操作示例
    • 事务处理示例
    • 批量操作
  • 四、连接池优化
    • 为什么需要连接池
    • 使用 DBUtils 实现连接池
  • 五、应用示例
    • Flask 集成示例
    • 连接池实践配置
    • 错误重试机制
  • 六、SQL事务操作对比
    • 事务影响
    • 事务特性
  • 七、总结

    本文将介绍如何使用 PyMySQL 连接和操作 MySQL 数据库,包括基本连接CRUD 操作事务处理以及如何在高并发环境下使用连接池优化性能。

    通过合理的连接池配置和错误处理机制,可以构建出稳定高效的数据库应用。

    一、PyMySQL 简介

    PyMySQL 是一个纯 python 实现的 MySQL 客户端库,用于连接和操作 MySQL 数据库。它完全兼容 Python DB API 2.0 规范,提供了简单易用的接口来执行 SQL 查询和操作。

    核心优势

    • 纯 Python 实现:无需外部依赖,跨平台兼容性好
    • Python 3 全面支持:兼容最新 Python 特性和语法
    • 线程安全:支持多线程并发操作
    • 完整功能支持:事务、存储过程、预处理语句等
    • 广泛兼容:支持 MySQL 5.5+ 和 mariadb

    安装方法

    pip install pymysql
    

    二、数据库连接配置

    基础连接方式

    import pymysql
    from pymysql.cursors import DictCursor
    
    # 推荐配置方式
    def create_connection():
        return pymysql.connect(
            host='localhost',      # 数据库地址
            user='username',       # 用户名
            password='password',   # 密码
            database='test_db',    # 数据库名
            port=3306,            # 端口,默认3306
            charset='utf8mb4',     # 字符集,推荐utf8mb4
            autocommit=False,     # 是否自动提交
            cursorclass=DictCursor # 返回字典格式结果
        )
    	
    

    连接参数说明

    参数说明
    host数据库服务器地址‘localhost’
    user用户名根据实际配置
    password密码根据实际配置
    database数据库名称项目数据库名
    charset字符编码‘utf8mb4’(支持表情符号)
    autocommit自动提交事务False(建议手动控制)
    cursorclass游标类型DictCursor(结果以字典返回)

    cursorclass参数说明

    cursorclass说明返回结果格式适用场景
    Cursor (默认)普通游标元组格式 (value1, value2, …)基础查询,需要最高性能时
    DictCursor字典游标字典格式 {‘column’: value}需要按列名访问数据时
    SSCursor无缓冲游标元组格式,流式读取处理大量数据,内存有限时
    SSDictCursor无缓冲字典游标字典格式,流式读取大量数据且需要按列名访问
    Cursor 子类自定义游标自定义格式特殊数据处理需求

    完整连接示例

    import pymysql
    from pymysql.cursors import DictCursor
    
    def get_db_connection():
        """获取数据库连接"""
        return pymysql.connect(
            host='localhost',
            user='myuser',
            password='mypassword',
            database='mydatabase',
            charset='utf8mb4',
            autocommit=False,
            cursorclass=DictCursor,
            connect_timeout=10  # 连接超时10秒
        )
    
    # 使用示例
    def test_connection():
        conn = get_db_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1 as test")
                result = cursor.fetchone()
                print("连接测试成功:", result)
        finally:
            conn.close()
    
    test_connection()
    

    输出:

    连接测试成功: {'test': 1}
    

    三、数据库基础操作

    创建示例数据表

    CREATE TABLE mydb.users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(100) UNIQUE NOT NULL,
        age INT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    

    相关说明

    关键字类型说明
    INT数据类型整数类型,用于存储整数值
    AUTO_INCREMENT约束/属性自动递增,每次插入新记录时自动生成唯一ID
    PRIMARY KEY约束主键,唯一标识每条记录
    VARCHAR(100)数据类型可变长度字符串,最大100字符
    NOT NULL约束该字段不能为空,必须包含值
    UNIQUE约束确保每个值唯一,不允许重复
    TIMESTAMP数据类型时间戳类型,用于存储日期和时间
    DEFAULT CURRENT_TIMESTAMP默认值默认值为当前系统时间

    数据库操作封装类

    import pymysql
    from pymysql.cursors import DictCursor
    from typing import List, Dict, Any, Optional, Tuple
    android
    class MySQLManager:
        """MySQL 数据库管理类"""
    
        def __init__(self, config: Dict[str, Any]):
            self.config = config
    
        def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
            """执行查询语句(SELECT)"""
            conn = pymysql.connect(**self.config)
            try:
                with conn.cursor(DictCursor) as cursor:
                    cursor.execute(sql, params or ())
                    return cursor.fetchall()
            finally:
                conn.close()
    
        def execute_update(self, sql: str, params: Tuple = None) -> int:
            """执行更新语句(INSERT/UPDATE/DELETE)"""
            conn = pymysql.connect(**self.config)
            try:
                with conn.cursor() as cursor:
                    affected_rows = cursor.execute(sql, params or ())
                    conn.commit()
                    return affected_rows
            except Exception as e:
                conn.rollback()
                raise e
            finally:
                conn.close()			
    

    CRUD 操作示例

    操作英文中文对应 SQL描述
    CCreate创建INSERT创建新记录
    RRead读取SELECT查询/读取数据
    UUpdate更新UPDATE修改现有记录
    DDelete删除DELETE删除记录
    # 数据库配置
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'password',
        'database': 'test_db',
        'charset': 'utf8mb4',
        'cursorclass': DictCursor 
    }
    
    db = MySQLManager(db_config)
    
    # 1. 插入数据
    def add_user(name: str, email: str, age: int) -> int:
        sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
        return db.execute_update(sql, (name, email, age))
    
    # 2. 查询数据
    def get_all_users() -> List[Dict]:
        return db.execute_query("SELECT * FROM users")
    
    # 3. 更新数据
    def update_user_email(user_id: int, new_email: str) -> int:
        sql = "UPDATE users SET email = %s WHERE id = %s"
        return db.execute_update(sql, (new_email, user_id))
    
    # 4. 删除数据
    def delete_user(user_id: int) -> int:
        return db.execute_update("DELETE FROM users WHERE id = %s", (user_id,))
    	
    if __name__ == '__main__':
        users = get_all_users()
        print(f"查询所有用户: {users}")
    	
        user_id = add_user("张三", "zhangsan@example.com", 25)
        print(f"执行:插入新用户")
    	
        users = get_all_users()
        print(f"查询所有用户: {users}")
    	
        user_id = users[0]['id']
        affected_rows = update_user_email(user_id, "zhangsan2@example.com")
        print(f"执行:更新邮箱,影响行数: {affected_rows}")
    	
        users = get_all_users()
        print(f"查询所有用户: {users}")
    	
        affected_rows = delete_user(user_id)
        print(f"执行:删除用户,影响行数: {affected_rows}")
    	
        users = get_all_users()
        print(f"查询所有用户: {users}")
    

    输出:

    查询所有用户: ()
    执行:插入新用户
    查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
    执行:更新邮箱,影响行数: 1
    查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan2@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
    执行:删除用户,影响行数: 1
    查询所有用户: ()
    

    事务处理示例

    模拟简单的转账操作,从一个用户账户转移到另一个用户账户。

    def transfer_points(sender_id: int, receiver_id: int, points: int) -> bool:
        """转账操作(事务示例)"""
        conn = pymysql.connect(**db_config)
        try:
            with conn.cursor(DictCursor) as cursor:
                # 检查发送者余额
                cursor.execute("SELECT points FROM accounts WHERE user_id = %s", (sender_id,))
                sender = cursor.fetchone()
                
                if not sender or sender['points'] < points:
                    raise ValueError("余额不足")
                
                # 执行转账
                cursor.execute("UPDATE accounts SET points = points - %s WHERE user_id = %s", 
                             (points, sender_id))
                cursor.execute("UPDATE accounts SET points = points + %s WHERE user_id = %s", 
                             (points, receiver_id))
                
                conn.commit()
                return True
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
    

    批量操作

    def BATch_insert_users(users: List[tuple]) -> int:
        """批量插入用户数据"""
        sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
        conn = pymysql.connect(**db_config)
        try:
            with conn.cursor() as cursor:
                affected_rows = cursor.executemany(sql, users)
                conn.commit()
                return affected_rows
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
    
    # 使用示例
    users_data = [
        ('张三', 'zhangsan@example.com', 25),
        ('李四', 'lisi@example.com', 30)
    ]
    batch_insert_users(users_data)
    

    四、连接池优化

    为什么需要连接池

    频繁创建和关闭数据库连接会导致:

    • 资源浪费(TCP 连接建立开销)
    • 性能下降(连接初始化时间)
    • 连接数耗尽(超过数据库最大连接数)

      连接池通过复用连接解决这些问题。

    使用 DBUtils 实现连接池

    安装方法

    pip install DBUtils
    

    实现示例

    from dbutils.pooled_db import PooledDB
    import pymysql
    import threading
    from typing import List, Dict, Any, Tuple
    from pymysql.cursors import DictCursor
    
    class ConnectionPool:
        """数据库连接池python"""
        
        _instance = None
        _lock = threading.Lock()
        
        def __new__(cls, config: Dict[str, Any]):
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance.pool_config = config.copy()
                    cls._instance._pool = PooledDB(
                        creator=pymysql,
                        maxconnections=20,  # 最大连接数
                        mincached=2,  # 初始空闲连接
                        maxcached=10,  # 最大空闲连接
                        blocking=True,  # 连接耗尽时等待
                        ping=1,  # 使用时检查连接
                        **config
                    )
            return cls._instance
                
        def get_connection(self):
            """从连接池获取连接"""
            return self._pool.connection()
    
    # 使用连接池的数据库管理器
    class PooledDBManager:
        def __init__(self, pool_config: Dict[str, Any]):
            self.pool = ConnectionPool(pool_config)
        
        def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
            """执行查询"""
            conn = self.pool.get_connection()
            try:
                with conn.cursor(DictCursor) as cursor:
                    cursor.execute(sql, params or ())
                    return cursor.fetchall()
            finally:
                conn.close()  # 实际是放回连接池
        
        def execute_update(self, sql: str, params: Tuple = None) -> int:
            """执行更新"""
            conn = self.pool.get_connection()
            try:
                with conn.cursor() as cursor:
                    affected_rows = cursor.execute(sql, params or ())
                    conn.commit()
                    return affected_rows
            except Exception as e:
                conn.rollback()
                raise e
            finally:
                conn.close()
    

    ping 参数说明

    0 = 不检查
    1 = 每次请求时检查(推荐)
    2 = 每次游标创建时检查
    4编程客栈 = 每次执行时检查
    7 = 1+2+4(所有检查)
    

    五、应用示例

    Flask 集成示例

    from dbutils.pooled_db import PooledDB
    from flask import Flask, request, jsonify
    from pymysql.cursors impandroidort DictCursor
    
    app = Flask(__name__)
    
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'password',
        'database': 'test_db',
        'charset': 'utf8mb4',
        'cursorclass': pymysql.cursors.DictCursor
    }
    
    # 初始化连接池
    db_manager = PooledDBManager(db_config)
    
    @app.route('/users', methods=['GET'])
    def get_users():
        """获取所有用户"""
        try:
            users = db_manager.execute_query("SELECT * FROM users")
            return jsonify({'success': True, 'data': users})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)}), 500
    
    @app.route('/users', methods=['POST'])
    def create_user():
        """创建用户"""
        try:
            data = request.json
            sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
            result = db_manager.execute_update(sql, (data['name'], data['email'], data['age']))
            return jsonify({'success': True, 'affected_rows': result})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)}), 500
    
    if __name__ == '__main__':
        app.run(debug=True)
    

    连接池实践配置

    # 优化后的连接池配置
    optimal_pool_config = {
        'maxconnections': 20,      # 根据并发量调整
        'mincached': 2,           # 减少初始资源占用
        'maxcached': 10,          # 控制最大空闲连接
        'blocking': True,         # 避免连接耗尽错误
        'ping': 1,                # 使用前检查连接健康
        **db_config              # 基础数据库配置
    }
    

    错误重试机制

    数据库操作重试装饰器:当数据库连接出现临时故障时,会自动进行最多3次重试,并且每次重试间隔时间按指数增长(1秒、2秒、4秒),提高程序的容错能力。

    import time
    from functools import wraps
    import pymysql
    
    def retry_on_failure(max_retries=3, initial_delay=1):
        """数据库操作重试装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                for attempt in range(max_retries):
                    try:
                        return func(*args, **kwargs)
                    except (pymysql.OperationalError, pymysql.InterfaceError) as e:
                        if attempt == max_retries - 1:
                            raise e
                        time.sleep(initial_delay * (2 ** attempt))  # 指数退避
                return None
            return wrapper
        return decorator
    
    # 使用示例
    @retry_on_failure(max_retries=3)
    def robust_query(sql, params=None):
        return db_manager.execute_query(sql, params)
    

    指数退避:当操作失败时,不立即重试,而是等待一段时间,且每次重试的等待时间呈指数级增长。等待 1 秒, 2 秒, 4 秒,8 秒…

    六、SQL事务操作对比

    事务影响

    操作类型语法示例主要用途返回值事务影响性能考虑使用场景
    SELECT

    (查询)

    SELECT * FROM users WHERE age > 18;从数据库中检索数据结果集(0行或多行)只读操作,不影响数据索引优化很重要,避免全表扫描数据查询、报表生成、数据分析
    UPDATE

    (更新)

    UPDATE users SET age = 20 WHERE id = 1;修改现有记录受影响的行数需要事务控制,会锁定行WHERE 条件要精确,避免锁表修改用户信息、更新状态、调整数值
    INSERT

    (插入)

    INSERT INTO users (name, age) VALUES (‘张三’, 25);添加新记录插入的行数(通常是1)需要事务控制批量插入比单条插入高效新增用户、创建订单、记录日志
    DELETE

    (删除)

    DELETE FROM users WHERE id = 1;删除记录受影响的行数需要事务控制,谨慎使用建议软删除,避免物理删除删除用户、清理数据、撤销操作

    事务特性

    操作是否自动提交锁级别回滚支持并发影响
    SELECT是(可设置)共享锁可回滚到快照低(读写不阻塞)
    UPDATE排他锁完全支持高(会阻塞其他写操作)
    INSERT排他锁完全支持中(可能触发索引重建)
    DELETE排他锁完全支持高(会阻塞其他操作)
    • 排他锁(X锁):写锁,一个事务独占资源,其他事务不能读写
    • 共享锁(S锁):读锁,多个事务可同时读取,但不能写入
    • 排他锁 = 独占,共享锁 = 共享读

    普通 SELECT 是完全无锁的,不会阻塞其他事务的写操作,也不会被写操作阻塞。只有显式加锁的SELECT才会影响并发。

    七、总结

    连接管理

    • 使用连接池管理数据库连接
    • 合理配置连接池参数
    • 及时释放连接回池

    事务控制

    • 明确控制事务边界
    • 及时提交或回滚事务
    • 处理并发场景下的数据一致性

    错误处理

    • 实现适当的重试机制
    • 记录详细的错误日志
    • 区分业务错误和系统错误

    性能优化

    • 使用预处理语句防止 SQL 注入
    • 合理使用批量操作
    • 监控连接池使用情况

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新数据库

    数据库排行榜