开发者

python读取Excel大文件的四种方法与优化

目录
  • 核心方法
  • 特殊场景处理
  • 关键优化技巧
  • 使用建议
  • Excel大文件读取方法
    • 1. pandas分块读取 (.xlsx, .xls)
    • 2. openpyxl逐行读取 (.xlsx) - 推荐
    • 3. xlrd处理 (.xls)
    • 4. pyxlsb处理 (.xlsb)
  • 新增功能特点
    • 安装依赖

      核心方法

      逐行读取 - 最常用,内存占用O(1)

      分块读取 - 适合超大文件,可控制内存使用

      内存映射 - 高性能,虚拟内存映射

      缓冲读取 - 平衡性能和内存

      特殊场景处理

      CSV文件 - 使用pandas的chunksize参数

      jsON Lines - 逐行解析JSON对象

      文本分析 - 内存高效的单词计数示例

      关键优化技巧

      使用生成器 - 避免一次性加载所有数据到内存

      合理设置块大小 - 平衡内存使用和IO效率

      进度监控 - 实时显示处理进度

      错误处理 - 处理编码错误、文件不存在等异常

      使用建议

      • 小于100MB: 直接读取到内存
      • 100MB-1GB: 使用逐行读取或小块读取
      • 大于1GB: 使用内存映射或大块分批处理
      • 结构化数据: 使用pandas的chunksize参数

      这些方法可以处理几GB甚至几十GB的文件而不会导致内存溢出。根据您的具体需求选择最适合的方法即可。

      示例代码:

      #!/usr/bin/env python3
      # -*- coding: utf-8 -*-
      """
      大文件读取示例 - 避免内存溢出的多种方法
      """
       
      import os
      import sys
      import mmap
      import csv
      import json
      from typing import Generator, Iterator
      import pandas as pd
       
       
      class LargeFileReader:
          """大文件读取工具类"""
          
          def __init__(self, file_path: str, encoding: str = 'utf-8'):
              self.file_path = file_path
              self.encoding = encoding
              
          def get_file_size(self) -> int:
              """获取文件大小(字节)"""
              return os.path.getsize(self.file_path)
          
          def get_file_size_mb(self) -> float:
              """获取文件大小(MB)"""
              return self.get_file_size() / (1024 * 1024)
       
       
      def method1_line_by_line(file_path: str, encoding: str = 'utf-8') -> Generator[str, None, None]:
          """
          方法1: 逐行读取 - 最常用的方法
          内存使用量: O(1) - 每次只加载一行
          适用场景: 文本文件、日志文件
          """
          try:
              with open(file_path, 'r', encoding=encoding) as file:
                  for line_num, line in enumerate(file, 1):
                      # 处理每一行
                      yield line.strip()  # 去除行尾换行符
                      
                      # 可选:显示进度
                      if line_num % 10000 == 0:
                          print(f"已处理 {line_num} 行")
                          
          except FileNotFoundError:
              print(f"文件未找到: {file_path}")
          except UnicodeDecodeError as e:
              print(f"编码错误: {e}")
       
       
      def method2_chunk_reading(file_path: str, chunk_size: int = 1024*1024, encoding: str = 'utf-8') -> Generator[str, None, None]:
          """
          方法2: 按块读取 - 适合处理二进制文件或超大文本文件
          内存使用量: O(chunk_size) - 每次加载指定大小的块
          适用场景: 二进制文件、超大文本文件
          """
          try:
              with open(file_path, 'r', encoding=encoding) as file:
                  while True:
                      chunk = file.read(chunk_size)
                      if not chunk:
                          break
                      yield chunk
                      
          except FileNotFoundError:
              print(f"文件未找到: {file_path}")
          except UnicodeDecodeError as e:
              print(f"编码错误: {e}")
       
       
      def method3_mmap_reading(file_path: str, encoding: str = 'utf-8') -> Iterator[str]:
          """
          方法3: 内存映射文件 - 高性能读取
          内存使用量: 虚拟内存映射,物理内存按需加载
          适用场景: 需要随机访问的大文件、高性能要求
          """
          try:
              with open(file_path, 'r', encoding=encoding) as file:
                  with mmap.mmap(file.fileno(), 0, Access=mmap.ACCESS_READ) as mmapped_file:
                      # 逐行读取
                      for line in iter(mmapped_file.readline, b""):
                          yield line.decode(encoding).strip()
                          
          except FileNotFoundError:
              print(f"文件未找到: {file_path}")
          except Exception as e:
              print(f"内存映射错误: {e}")
       
       
      def method4_buffered_reading(file_path: str, buffer_size: int = 8192, encoding: str = 'utf-8') -> Generator[str, None, None]:
          """
          方法4: 缓冲区读取 - 平衡性能和内存使用
          内存使用量: O(buffer_size)
          适用场景: 需要自定义缓冲区大小的场景
          """
          try:
              with open(file_path, 'r', encoding=encoding, buffering=buffer_size) as file:
          js        for line in file:
                      yield line.strip()
                      
          except FileNotFoundError:
              print(f"文件未找到: {file_path}")
          except UnicodeDecodeError as e:
              print(f"编码错误: {e}")
       
       
      def process_large_csv(file_path: str, chunk_size: int = 10000) -> None:
          """
          处理大型CSV文件的示例
          使用pandas的chunksize参数分块读取
          """
          print(f"开始处理CSV文件: {file_path}")
          
          try:
              # 分块读取CSV文件
              chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)
              
              total_rows = 0
              for chunk_num, chunk in enumerate(chunk_iter, 1):
                  # 处理当前块
                  print(f"处理第 {chunk_num} 块,包含 {len(chunk)} 行")
                  
                  # 示例处理:统计每列的基本信息
                  print(f"列名: {list(chunk.columns)}")
                  print(f"数据类型: {chunk.dtypes.to_dict()}")
                  
                  # 这里可以添加你的数据处理逻辑
                  # 例如:数据清洗、计算、转换等
                  
                  total_rows += len(chunk)
                  
                  # 可选:限制处理的块数量(用于测试)
                  if chunk_num >= 5:  # 只处理前5块
                      break
                      
              print(f"总共处理了 {total_rows} 行数据")
              
          except FileNotFoundError:
              print(f"CSV文件未找到: {file_path}")
          except pd.errors.EmptyDataError:
              print("CSV文件为空")
          except Exception as e:
              print(f"处理CSV文件时出错: {e}")
       
       
      def process_large_json_lines(file_path: str) -> None:
          """
          处理大型JSON Lines文件 (.jsonl)
          每行是一个独立的JSON对象
          """
          print(f"开始处理JSON Lines文件编程客栈: {file_path}")
          
          try:
              with open(file_path, 'r', encoding='utf-8') as file:
                  for line_num, line in enumerate(file, 1):
                      line = line.strip()
                      if not line:
                          continue
                          
                      try:
                          # 解析JSON对象
                          json_obj = json.loads(line)
                          
                          # 处理JSON对象
                          # 这里添加你的处理逻辑
                          print(f"第 {line_num} 行: {type(json_obj)} - {len(str(json_obj))} 字符")
                          
                          # 示例:提取特定字段
                          if isinstance(json_obj, dict):
                              keys = list(json_obj.keys())[:5]  # 只显示前5个键
                              print(f"  键: {keys}")
                          
                      except json.JSONDecodeError as e:
                          print(f"第 {line_num} 行JSON解析错误: {e}")
                          continue
                      
                      # 显示进度
                      if line_num % 1000 == 0:
                          print(f"已处理 {line_num} 行")
                          
                      # 可选:限制处理行数(用于测试)
                      if line_num >= 10000:
                          break
                          
          except FileNotFoundError:
              print(f"JSON Lines文件未找到: {file_path}")
       
       
      def process_with_progress_callback(file_path: str, callback_interval: int = 10000) -> None:
          """
          带进度回调的文件处理示例
          """
          reader = LargeFileReader(file_path)
          file_size_mb = reader.get_file_size_mb()
          
          print(f"文件大小: {file_size_mb:.2f} MB")
          print("开始处理文件...")
          
          processed_lines = 0
          
          for line in method1_line_by_line(file_path):
              # 处理每一行
              # 这里添加你的处理逻辑
              line_length = len(line)
              
              processed_lines += 1
              
              # 进度回调
              if processed_lines % callback_interval == 0:
                  print(f"已处理 {processed_lines:,} 行")
                  
              # 可选:限制处理行数(用于测试)
              if processed_lines >= 50000:
                  print("达到处理限制,停止处理")
                  break
          
          print(f"处理完成,总共处理了 {processed_lines:,} 行")
       
       
      def memory_efficient_word_count(file_path: str) -> dict:
          """
          内存高效的单词计数示例
          适用于超大文本文件
          """
          word_count = {}
          
          print("开始统计单词频率...")
          
          for line_num, line in enumerate(method1_line_by_line(file_path), 1):
              # 简单的单词分割(可以根据需要改进)
              words = line.lower().split()
              
             nXyDLRn for word in words:
                  # 清理单词(去除标点符号等)
                  clean_word = ''.join(c for c in word if c.isalnum())
                  if clean_word:
                      word_count[clean_word] = word_count.get(clean_word, 0) + 1
              
              # 显示进度
              if line_num % 10000 == 0:
                  print(f"已处理 {line_num} 行,当前词汇量: {len(word_count)}")
          
          print(f"统计完成,总词汇量: {len(word_count)}")
          
          # 返回前10个最常用的单词
          sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
          rhttp://www.devze.cometurn dict(sorted_words[:10])
       
       
      def main():
          """主函数 - 演示各种大文件读取方法"""
          
          # 注意:请替换为你的实际文件路径
          test_file = "large_file.txt"  # 替换为实际的大文件路径
          csv_file = "large_data.csv"   # 替换为实际的CSV文件路径
          json_file = "large_data.jsonl"  # 替换为实际的JSON Lines文件路径
          
          print("=== 大文件读取示例 ===\n")
          
          # 检查文件是否存在
          if not os.path.exists(test_file):
              print(f"测试文件 {test_file} 不存在")
              print("请创建一个测试文件或修改文件路径")
              return
          
          # 创建文件读取器
          reader = LargeFileReader(test_file)
          
          print(f"文件路径: {test_file}")
          print(f"文件大小: {reader.get_file_size_mb():.2f} MB\n")
          
          # 示例1: 逐行读取(推荐用于大多数文本文件)
          print("=== 方法1: 逐行读取 ===")
          line_count = 0
          for line in method1_line_by_line(test_file):
              line_count += 1
              if line_count <= 5:  # 只显示前5行
                  print(f"第{line_count}行: {line[:50]}...")
              if line_count >= 10000:  # 限制处理行数
                  break
          print(f"处理了 {line_count} 行\n")
          
          # 示例2: 块读取
          print("=== 方法2: 块读取 ===")
          chunk_count = 0
          for chunk in method2_chunk_reading(test_file, chunk_size=1024):
              chunk_count += 1
              if chunk_count <= 3:  # 只显示前3块
                  print(f"块{chunk_count}: {len(chunk)} 字符")
              if chunk_count >= 10:  # 限制处理块数
                  break
          print(f"处理了 {chunk_count} 个块\n")
          
          # 示例3: 内存映射
          print("=== 方法3: 内存映射 ===")
          mmap_count = 0
          try:
              for line in method3_mmap_reading(test_file):
                  mmap_count += 1
                  if mmap_count >= 10000:  # 限制处理行数
                      break
              print(f"使用内存映射处理了 {mmap_count} 行\n")
          except Exception as e:
              print(f"内存映射失败: {e}\n")
          
          # 示例4: 带进度的处理
          print("=== 方法4: 带进度回调的处理 ===")
          process_with_progress_callback(test_file, callback_interval=5000)
          print()
          
          # 示例5: CSV文件处理
          if os.path.exists(csv_file):
              print("=== CSV文件处理 ===")
              process_large_csv(csv_file, chunk_size=1000)
              print()
          
          # 示例6: JSON Lines文件处理
         android if os.path.exists(json_file):
              print("=== JSON Lines文件处理 ===")
              process_large_json_lines(json_file)
              print()
          
          # 示例7: 单词计数
          print("=== 内存高效单词计数 ===")
          try:
              top_words = memory_efficient_word_count(test_file)
              print("前10个最常用单词:")
              for word, count in top_words.items():
                  print(f"  {word}: {count}")
          except Exception as e:
              print(f"单词统计失败: {e}")
       
       
      if __name__ == "__main__":
          main()
      

      性能优化建议:

      1. 选择合适的方法:

      • 逐行读取: 适用于大多数文本文件
      • 块读取: 适用于二进制文件或需要自定义处理块的场景
      • 内存映射: 适用于需要随机访问或高性能要求的场景
      • pandas分块: 适用于结构化数据(CSV)

      2. 内存优化:

      • 及时释放不需要的变量
      • 使用生成器而不是列表
      • 避免一次性加载整个文件到内存

      3. 性能优化:

      • 合理设置缓冲区大小
      • 使用适当的编码
      • 考虑使用多进程/多线程处理

      4. 错误处理:

      • 处理文件不存在的情况
      • 处理编码错误
      • 处理磁盘空间不足等IO错误

      Excel大文件读取方法

      1. pandas分块读取 (.xlsx, .xls)

      适合中等大小的Excel文件

      可以处理多个工作表

      支持数据类型自动识别

      2. openpyxl逐行读取 (.xlsx) - 推荐

      内存效率最高的方法

      使用read_only=True模式

      真正的逐行处理,内存占用O(1)

      适合处理超大Excel文件

      3. xlrd处理 (.xls)

      专门处理旧版Excel格式

      分块读取支持

      适合Legacy Excel文件

      4. pyxlsb处理 (.xlsb)

      处理Excel二进制格式

      读取速度快,文件小

      需要额外安装pyxlsb库

      新增功能特点

      智能文件信息获取 - 不加载全部数据就能获取文件结构信息

      内存使用对比 - 实时监控不同方法的内存消耗

      批量数据处理 - 支持批次处理和进度监控

      多格式支持 - 支持.xlsx、.xls、.xlsb三种格式

      错误处理 - 完善的异常处理机制

      安装依赖

      pip install pandas openpyxl xlrd pyxlsb psutil
      

      使用建议

      • 小于50MB: 可以使用pandas直接读取
      • 50MB-500MB: 使用pandas分块读取
      • 大于500MB: 推荐使用openpyxl逐行读取
      • 超大文件: 考虑转换为CSV或Parquet格式

      这套方案可以处理几GB甚至更大的Excel文件而不会内存溢出,特别是openpyxl的逐行读取方法,是处理超大Excel文件的最佳选择!

      到此这篇关于python读取Excel大文件的四种方法与优化的文章就介绍到这了,更多相关python读取Excel内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜