How to process rows of a CSV file using Groovy/GPars most efficiently?
The question is a simple one and I am surprised it did not pop up immediately when I searched for it.
I have a CSV file, a potentially really large one, that needs to be processed. Each line should be handed to a processor until all rows are processed. For reading the CSV file, I'll be using OpenCSV which essentially provides a readNext() method which gives me the next row. If no more rows are available, all processors should terminate.
For this I created a really simple groovy script, defined a synchronous readNext() method (as the reading of the next line is not really time consu开发者_如何学编程ming) and then created a few threads that read the next line and process it. It works fine, but...
Shouldn't there be a built-in solution that I could just use? It's not the gpars collection processing, because that always assumes there is an existing collection in memory. Instead, I cannot afford to read it all into memory and then process it, it would lead to outofmemory exceptions.
So.... anyone having a nice template for processing a CSV file "line by line" using a couple of worker threads?
Concurrently accessing a file might not be a good idea and GPars' fork/join-processing is only meant for in-memory data (collections). My sugesstion would be to read the file sequentially into a list. When the list reaches a certain size, process the entries in the list concurrently using GPars, clear the list and then move on with reading lines.
This might be a good problem for actors. A synchronous reader actor could hand off CSV lines to parallel processor actors. For example:
@Grab(group='org.codehaus.gpars', module='gpars', version='0.12')
import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.actor.Actor
class CsvReader extends DefaultActor {
void act() {
loop {
react {
reply readCsv()
}
}
}
}
class CsvProcessor extends DefaultActor {
Actor reader
void act() {
loop {
reader.send(null)
react {
if (it == null)
terminate()
else
processCsv(it)
}
}
}
}
def N_PROCESSORS = 10
def reader = new CsvReader().start()
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join()
I'm just wrapping up an implementation of a problem just like this in Grails (you don't specify if you're using grails, plain hibernate, plain JDBC or something else).
There isn't anything out of the box that you can get that I'm aware of. You could look at integrating with Spring Batch, but the last time I looked at it, it felt very heavy to me (and not very groovy).
If you're using plain JDBC, doing what Christoph recommends probably is the easiest thing to do (read in N rows and use GPars to spin through those rows concurrently).
If you're using grails, or hibernate, and want your worker threads to have access to the spring context for dependency injection, things get a bit more complicated.
The way I solved it is using the Grails Redis plugin (disclaimer: I'm the author) and the Jesque plugin, which is a java implementation of Resque.
The Jesque plugin lets you create "Job" classes that have a "process" method with arbitrary parameters that are used to process work enqueued on a Jesque queue. You can spin up as many workers as you want.
I have a file upload that an admin user can post a file to, it saves the file to disk and enqueues a job for the ProducerJob that I've created. That ProducerJob spins through the file, for each line, it enqueues a message for a ConsumerJob to pick up. The message is simply a map of the values read from the CSV file.
The ConsumerJob takes those values and creates the appropriate domain object for it's line and saves it to the database.
We already were using Redis in production so using this as a queueing mechanism made sense. We had an old synchronous load that ran through file loads serially. I'm currently using one producer worker and 4 consumer workers and loading things this way is over 100x faster than the old load was (with much better progress feedback to the end user).
I agree with the original question that there is probably room for something like this to be packaged up as this is a relatively common thing.
UPDATE: I put up a blog post with a simple example doing imports with Redis + Jesque.
精彩评论