开发者

C#实现千万数据秒级导入的代码

目录
  • 前言
  • 一、数据存储
  • 二、处理逻辑
    • 优化前代码处理逻辑
    • 优化后的代码
  • 总结

    前言

    在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,这里我就给大家分享一下千万数据秒级导入怎么实现

    一、数据存储

    这里使用到的数据库是clickhouse,因为像mysql、sqlserver这类关系型数据库对于大数据的支持不怎么好所以这里我使用的是clickhouse,它是由俄罗斯的yindex公司开发的列式存储数据库。查询语句这些和mysql差不多上手容易学起来简单。需要注意的是clickhouse对数据的删除比较严格一般都不会去删除它里面的数据库,如果要删除数据,我们一般都是采用用一个字段标记数据是否被删除,然后过滤掉这些被删除的数据,clickhouse的四种引擎也可以去了解一下

    二、处理逻辑

    优化前导入耗时两分钟以上优化后导入耗时30秒以内

    优化前代码处理逻辑

    优化后导入一千万数据大约需要两分钟完成处理。处理逻辑是将数据循环装入List中,当数据量达到预设的chunkSize阈值(150万)时开始批量插入。值得注意的是,1000万数据需要循环处理7次。这里存在一个常见误区:认为提高150万的阈值能加快处理速度。实际上,处理速度慢的原因并非循环次数,而是由于单线程运行无法充分发挥ClickHouse的高并发优势。

    private async Task InsertContentsAsync2(string filePath, BlackListCreateDto dto, ulong maxId, long BATchId)
     {
         Stopwatch stopwatch = new();
         var regex = RegexConst.Phone();
         var userId = CurrentUser.Id.ToString();
         var isTxt = filePath.Split(".").Last() == "txt";
         var chunkSize = 1500000;
         List<string> blackLists = new();
         injst offset = 0;
         int total = 0;
         stopwatch.Start();
         if (isTxt)
         {
             using StreamReader sr = new(filePath);
             string line;
             while ((line = await sr.ReadLineAsync()) != null)
             {
                 if (regex.IsMatch(line))
                 {
                  编程客栈   blackLists.Add(line);
                     offset++;
                     total++;
                 }
                 if (offset >= chunkSize)
                 {
                     await InsertDataAsync(blackLists);
                     offset = 0;
                     blackLists.Clear();
                 }
             }
    
             if (offset >= 0)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }
         else
         {
    
             var rows = await MiniExcel.QueryAsync(filePath);
             foreach (IEnumerable<dynamic> row in rows)
             {
                 var pair = (IDictionary<string, object>)row;
                 var cells = pair.Values;
                 if (cells != null && cells.Any())
                 {
                     foreach (var cell in cells)
                     {
                         if (cell is not null && cell is string str)
                         {
                             blackLists.Add(str);
                             offset++;
                             total++;
                         }
    
                     }
                 }
                 if (offset >= chunkSize)
                 {
                     await InsertDataAsync(blackLists);
                     offset = 0;
                     blackLists.Clear();
                 }
             }
    
             if (offset >= 0)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }
    
         async Task InsertDataAsync(List<string> blackLists)
         {
             List<BlackList> list = blackLists.Select(it => CreateBlackList(dto, batchId, ++maxId, userId, it.ToString())).ToList();
             var result = await _clickHouseClientRepository.BulkInsertAsync(list, _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
             blackLists.Clear();
         }
         stopwatch.Stop();
         Logger.LogInformation("文件路径:{filePath},{total}条数据, 耗时:{Elapsed} ", filePath, total, stopwatch.Elapsed);
     }
    

    优化后的代码

    优化后的代码将单线程执行模式改为多消费者并行处理,采用生产者-消费者模式分离IO和计算操作。文件读取时使用最大缓冲区和顺序扫描:

    await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);
    

    关键优化点包括:

    • 使用异步文件读取(FileStream异步API)
    • 预分配列表内存(new List(chunkSize))
    • 采用ClickHouse批量并行提交而非单条插入
    • 设置chunkSize为10万条
    • 使用BoundedChannel进行管道容量控制

    关于管道容量控制的作用:

    1. 防止内存溢出:限制管道中积压的未处理数据量,避免生产者过快导致内存问题
    2. 实现自动背压(Backpressure):管道满时生产者会自动阻塞或丢弃数据(取决于配置),直到消费者处理部分数据

    选择10万而非150万的原因:

    原因1:内存考量

    • 150万条数编程客栈据(假设每条20字节)单批次占用约30MB
    • 并行处理时多个批次会显著增加内存压力,可能触发GC影响性能
    • 10万条仅占用约2MB,对GC更友好,适合长期批处理

    原因2:ClickHouse性能

    • BulkInsert在5万~10万批次时吞吐量最佳

    原因3:并行处理效率

    • 10万条/批次能更好分配任务到多线程/核心
    • 150万条/批次可能导致任务分配不均

    原因4:I/O优化

    • 大数据批次可能导致I/O阻塞,CPU空闲等待
    • 小批次(10万)能更好重叠I/O和计算,提升吞吐量
    private async Task InsertContentsAsync(string filePath, BlackListCreateDto dto, ulong maxId, long batchId)
     {
         var stopwatch = Stopwatch.StartNew();
         var regex = RegexConst.Phone();
         var userId = CurrentUser.Id.ToString();
         var isTxt = filePath.EndsWith(".txt", StringComparison.OrdinalIgnoreCase);
         const int chunkSize = 100_000; 
         var totalProcessed = 0L;
    
         // 并行处理管道
         var processingChannel = Channel.CreateBounded<List<string>>(new BoundedChannelOptions(5)
         {
             SingleWriter = true,
             SingleReader = false,
             FullMode = BoundedChannelFullMode.Wait
         });
    
         // 启动并行消费者
         var processingTasks = Enumerable.Range(0, Environment.ProcessorCount)
             .Select(_ => Task.Run(async () =>
             {
                 await foreach (var batch in processingChannel.Reader.ReadAllAsync())
                 {
                     await ProcessBatchAsync(batch);
                     Interlocked.Add(ref totalProcessed, batch.Cou编程客栈nt);
                 }
             })).ToArray();
    
         try
         {
             if (isTxt)
             {
                 await ProcessTextFileAsync(filePath, regex, processingChannel.Writer, chunkSize);
             }
             else
             {
                 await ProcessExcelFileAsync(filePath, processingChannel.Writer, chunkSize);
             }
         }
         finally
         {
             processingChannel.Writer.Complete();
             await Task.WhenAll(processingTasks);
             stopwatch.Stop();
    
             Logger.LogInformation("文件路径:{FilePath}, {Total}条数据, 耗时:{Elapsed}ms",
                 filePath, totalProcessed, stopwatch.ElapsedMilliseconds);
         }
    
         async Task ProcessBatchAsync(List<string> batch)
         {
             var entities = batch
                 .Select(phone => CreateBlackList(dto, batchId, ++maxId, userId, phone))
                 .ToList();
    
             await _clickHouseClientRepository.BulkInsertAsync(
                 entities,
                 _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
         }
     }
    
     private async Task ProcessTextFileAsync(string filePath, Regex regex, ChannelWriter<List<string>> writer, int chunkSize)
     {
         await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);
         using var sr = new StreamReader(fs);
    
         var batch = new List<string>(chunkSize);
         string line;
    
         while ((line = await sr.ReadLineAsync()) != null)
         {
             if (regex.IsMatch(line))
             {
                 batch.Add(line);
    
                 if (batch.Count >= chunkSize)
                 {
                     await writer.WriteAsync(batch);
                     batch = new List<string>(chunkSize);
                 }
             编程}
         }
    
         if (batch.Count > 0)
         {
             await writer.WriteAsync(batch);
         }
     }
    
     private async Task ProcessExcelFileAsync(string filePath, ChannelWriter<List<string>> writer, int chunkSize)
     {
         await using var stream = File.Open(filePath, FileMode.Open, FileAccess.Read);
         using var reader = MiniExcel.QueryAsDataTable(stream);
         var batch = new List<string>(chunkSize);
         foreach (DataRow row in reader.Rows)
         {
             var cell = row[0].ToString();
             if (cell is string str && !string.IsNullOrWhiteSpace(str))
             {
                 batch.Add(str);
    
                 if (batch.Count >= chunkSize)
                 {
                     await writer.WriteAsync(batch);
                     batch = new List<string>(chunkSize);
                 }
             }
         }
    
         if (batch.Count > 0)
         {
             await writer.WriteAsync(batch);
         }
     }
    

    总结

    到此这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这了,更多相关C#千万数据秒级导入内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜