开发者

Read CSV data in batches and Process it

I have a csv file that looks like this

#DELTA,1#    
Risk1,10
Risk2,10
Risk3,10
Risk4,10
Risk5,10
#DELTA,1#    
Risk6,10
Risk7,10
Risk8,10
Risk9,10
Risk10,10

and so on. These are very large files (in order of GBs).

What I want to be able to do is to read them in batc开发者_Go百科hes like

start streamreader from csv file from first line to just before next #Delta starts

---Batch 1---
#DELTA,1
Risk1,10
Risk2,10
Risk3,10
Risk4,10
Risk5,10
--Batch 2-----
#DELTA,1
Risk6,10
Risk7,10
Risk8,10
Risk9,10
Risk10,10
----------------------

and once I get a batch put this subset for processing and come back and restart preparing another batch and so on till the end of file is reached.

I have tried making the LINQ's take and take while but with my understanding of LINQ I am not getting far.

Basically in summary it have to stream data in batches based on a pattern in my stream.. maybe my brain cells are dead or maybe it is too late in evening. Really appreaicate anyone's help


The easiest approach would be a TextReader and ReadLine().

For positioning, I would just leave the Reader open between processing the batches. If that's not an option, save the (stream) Position and restore it later.

With a StreamReader, if you have to close the file, you'd have to keep a lineCount and read-and-skip from the beginning again. Not too attractive.


Assuming you can keep each batch in memory at a time, and can keep the StreamReader open all the time, you might want to write something like this:

public static void ProcessBatches(TextReader reader,
    Func<string, bool> delimiterDetector,
    Action<List<string>> batchAction)
{
    string line;
    List<string> batch = new List<string>();
    while ((line = reader.ReadLine()) != null)
    {
        if (delimiterDetector(line))
        {
            batchAction(batch);
            batch = new List<string>();
        }
    }
    batchAction(batch);
}

This is assuming that the delimiter isn't required when processing the batch.

You'd then call it like this:

using (TextReader reader = File.OpenText("foo.csv"))
{
    ProcessBatch(reader, line => line == "# DELTA,1", BatchAction);
}
...
private static void BatchAction(List<string> batch)
{
    ...
}


There is (at least) one tool on codeplex that may be of use here: KBCsv


This might be an approach that is useful. If you write a method that returns IEnumerable then you can use yield return to allow your caller to perform processing before the method then continues. So, for example, if you write a methods like ReadBatches below...

    static IEnumerable<IEnumerable<string>> ReadBatches(string fileName)
    {
        var file = File.OpenText(fileName);
        var batchItems = new List<string>();

        while (!file.EndOfStream)
        {
            // clear the batch list
            batchItems.Clear();

            // read file in batches of 3
            // your logic on splitting batches might differ
            for (int i = 0; i < 3; i++)
            {
                if (file.EndOfStream)
                    break;

                batchItems.Add(file.ReadLine());
            }

            // this allows the caller to perform processing, and only
            // returns back here when they pull on the next item in the
            // IEnumerable
            yield return batchItems;                
        }

        file.Close();
    }

... then you can call this method like so...

    static void Main(string[] args)
    {
        foreach (IEnumerable<string> batch in ReadBatches("data.txt"))
        {
            Console.WriteLine("*** Processing Batch ***");
            foreach (var item in batch)
            {
                Console.WriteLine(item);
            }
        }                      
    }

... and with a data.txt that looks like this...

Row1
Row2
Row3
Row4
Row5
Row6
Row7

... then you see this on the console...

*** Processing Batch ***
Row1
Row2
Row3
*** Processing Batch ***
Row4
Row5
Row6
*** Processing Batch ***
Row7

Press any key to continue . . .

You method ReadBatches is an Iterator Block. The compiler builds a state machine that allows execution to return to this method. It allows you to write the method ReadBatches as if the thread of execution jumps back to the caller and then back again to get the next batch. This isn't really what happens (the compiler has sprinkled magic here to give that impression) but it's a very powerful way to write steaming APIs.

I've not dealt with your batching logic (my logic simply batches 3 rows of the file together) but hopefully this will give you the idea.

More on yield return here: http://msdn.microsoft.com/en-us/library/9k7k7cf0.aspx

And the hot SO questions on yield return here: https://stackoverflow.com/tags/yield-return/hot


Memory mapped files are ideal for reading portions of very large files, and with .NET 4.0 there is now managed support for them, so you won't have to use the Windows API directly.


I have finally managed to resolve . Thanks everyone for their input. I went the way of using Enumerable collection and doing 2 loops. Loop 1 :- to get the indexes of all the START OF BATCH (#) in this case + last line's index Loop 2 :- to take chunks and skip the ones taken already.

I plan to enhance this to use Observable Collection and invoke a call to load once the batch is done and then carry on

This is the final code in case it might be useful (or someone can look at it and suggest improvements)

using System;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Linq;

namespace LinqToText
{
  class Program
  {

    static void Main(string[] args)
    {
        var csvLines = new List<string>();
        var contextIndexes = new List<int>();
        int counter = 0;


        var instream = FastReadCsvFile(@"D:\Data\Mock\mock_data.csv");
        foreach (var str in instream)
        {
            if(str.Contains("#"))
            {
                contextIndexes.Add(counter);
            }
            counter++;
        }
        contextIndexes.Add(instream.Count());

        foreach (var indexes in contextIndexes)
        {
            Console.WriteLine(indexes);
        }
        int[] ixpos = contextIndexes.ToArray();


        for(int i = 0 ;i< ixpos.Length-1;i++)
        {
            int strtPos = ixpos[i];
            int endPos = ixpos[i+1];
            var batch = instream.Skip(strtPos).Take(endPos - strtPos);
            foreach (var dt in batch)
            {
                Console.WriteLine(dt);
            }
        }
        Console.WriteLine("End Of Processing");
        Console.Read();

    }

      private static IEnumerable<string> FastReadCsvFile(string file)
      {
          using (var reader = new StreamReader(file, Encoding.Default))
          {
              string line;
              while ((line = reader.ReadLine()) != null)
              {
                  yield return line;
              }
          }
      }
  }
}


So the final Answer after Jon Skeet's help is (Incase it is useful to some one)

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace LinqToText
{
  class Program
  {

    static void Main(string[] args)
    {
        using (TextReader reader = File.OpenText(@"D:\Data\Mock\mock_data.csv"))
            {
                ProcessBatches(reader, line => line.Contains("#"), BatchAction); 
            } 
        Console.WriteLine("End Of Processing");
        Console.Read();

    }

      public static void ProcessBatches(TextReader reader, Func<string, bool> delimiterDetector,Action<List<string>> batchAction)
      {
          string line;
          var batch = new List<string>();
          var counter = 0;
          while ((line = reader.ReadLine()) != null)
          {
              if (delimiterDetector(line) && counter !=0)
              {

                  batchAction(batch);
                  batch = new List<string>();
              }
              batch.Add(line);
              counter++;
          }
          batchAction(batch);
      }
      private static void BatchAction(List<string> batch) 
        {
          Console.WriteLine("Processing a single batch...................");
          foreach (var str in batch)
          {
              Console.WriteLine(str);
          }
          Console.WriteLine("End of single batch processing...................");
          Thread.Sleep(1000);

        } 


  }
}

The result is

Processing a single batch...................
#ROW1,1,0,CNO,CURVE CNO #0,CNO6M,Tenor set CNO #0,ON|TN|1D|1W|1M|2M|3M|1Y|2Y|3Y|
4Y|5Y|6Y|7Y|8Y|9Y|10Y|11Y|12Y|13Y|14Y|15Y|16Y|17Y|18Y|19Y|20Y|21Y|
Risk1,10
Risk2,10
Risk3,10
Risk4,10
Risk5,10
End of single batch processing...................
Processing a single batch...................
#ROW2,1,0,CNO,CURVE CNO #0,CNO6M,Tenor set CNO #0,ON|TN|1D|1W|1M|2M|3M|1Y|2Y|3Y|
4Y|5Y|6Y|7Y|8Y|9Y|10Y|11Y|12Y|13Y|14Y|15Y|16Y|17Y|18Y|19Y|20Y|21Y|
Risk6,10
Risk7,10
Risk8,10
Risk9,10
Risk10,10
End of single batch processing...................
Processing a single batch...................
#ROW3,1,0,CNO,CURVE CNO #0,CNO6M,Tenor set CNO #0,ON|TN|1D|1W|1M|2M|3M|1Y|2Y|3Y|
4Y|5Y|6Y|7Y|8Y|9Y|10Y|11Y|12Y|13Y|14Y|15Y|16Y|17Y|18Y|19Y|20Y|21Y|
Risk11,10
Risk12,10
Risk13,10
Risk14,10
Risk15,10
End of single batch processing...................
Processing a single batch...................
#ROW4,1,0,CNO,CURVE CNO #0,CNO6M,Tenor set CNO #0,ON|TN|1D|1W|1M|2M|3M|1Y|2Y|3Y|
4Y|5Y|6Y|7Y|8Y|9Y|10Y|11Y|12Y|13Y|14Y|15Y|16Y|17Y|18Y|19Y|20Y|21Y|
Risk16,10
Risk17,10
Risk18,10
Risk19,10
Risk20,10
End of single batch processing...................
End Of Processing

Thanks everyone for their help and Jon for giving the final solution.


Here's an implementation based on KBCsv:

using (var csvReader = new CsvReader(@"D:\Data\Mock\mock_data.csv"))
{
    while (csvReader.HasMoreRecords)
    {
        var record = csvReader.ReadDataRecord();

        if (record[0].StartsWith("#"))
        {
            if (csvReader.RecordCount > 0)
            {
                EndBatch();
            }

            BeginBatch();
        }
        else
        {
            ProcessRecord(record);
        }
    }
}

private void BeginBatch()
{
    Console.WriteLine("Beginning batch");
}

private void EndBatch()
{
    Console.WriteLine("Ending batch");
}

private void ProcessRecord(DataRecord record)
{
    Console.WriteLine("Processing record: {0}", record);
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜