How to speed up the processing of a large CSV using ruby
For a project I need to parse some pretty large CSV files. The contents of some of the entries is stored in a MySQL database. I am trying to speed this up using multithreading, but up to now this only slows things down.
I parse a CSV file (up to 10GB), and some of these records (aprox. 5M out of a 20M+ record CSV) need to be inserted into a MySQL database. To determine which record needs to be inserted we use a Redis server with sets that contain the correct id's / references.
Since we process around 30 of these files at any given time, and there are some dependencies, we store each file on a Resque queue and have multiple servers handling these (prioritized) queues.
In a nutshell:
class Worker
def self.perform(file)
CsvParser.each(file) do |line|
next unless check_line_with_redis(line)
a = ObjectA.find_or_initialize_by_reference(line[:reference])
a.object_bs.destroy_all
a.update_attributes(line)
end
end
This works, scales nice horizontally (more CSV files = more servers), but larger CSV files pose a problem. We currently have files that take over 75 hours to parse this way. There are a number of optimizations I thought of already:
One is cutting down on the MySQL queries; we instantiate AR objects while an insert with plain SQL, if we know the objects Id, is much faster. This way we can probably get rid of most of AR and maybe even Rails to remove overhead this way. We can't use a plain MySQL load data since we have to map the CSVs records to other entities that might have different Ids by now (we combine a dozen legacy databases into a new database).
The other is trying to do more in the same time. There is some IO wait time, network wait time for both Redis and MySQL, and even while MRI uses green threads this might allow us to schedule our MySQL queries at开发者_运维百科 the same time as the IO reads etc. But using the following code:
class Worker
def self.perform(file)
CsvParser.each(file) do |line|
next unless check_line_with_redis(line)
create_or_join_thread(line) do |myLine|
a = ObjectA.find_or_initialize_by_reference(myLine[:reference])
a.object_bs.destroy_all
a.update_attributes(myLine)
end
end
def self.create_or_join_thread(line)
@thread.join if @thread.present?
@thread = Thread.new(line) do |myLine|
yield myLine
end
end
end
This slowly slows down the process. When I ps au
it starts off at 100% CPU, but as time progresses it goes down to just 2-3%. At that moment it does not insert new records at all, it just appears to hang.
I have strace
d the process, and at first I see MySQL queries pass by, after a while it appears it is not executing my ruby code at all. Could be a deadlock (it hung after parsing the last line of the CSV, but the process kept on running at 5% CPU and did not quit), or something I read here: http://timetobleed.com/ruby-threading-bugfix-small-fix-goes-a-long-way/
I am using Rails 2.3.8, REE, 1.8.7-2010.02 on Ubuntu 10.10. Any insights on how to handle large numbers of threads (or maybe why not to use threads here at all) is greatly appreciated!
Do you have any indexes on these tables?
Could you temporarily disable these indexes during your bulk inserts?
Before we do bulk inserts we disable index keys:
ALTER TABLE foo DISABLE KEYS
After we're finished, we enable index keys:
ALTER TABLE foo ENABLE KEYS
From the docs:
ALTER TABLE ... DISABLE KEYS tells MySQL to stop updating non-unique indexes. ALTER TABLE ... ENABLE KEYS then should be used to re-create missing indexes. MySQL does this with a special algorithm that is much faster than inserting keys one by one, so disabling keys before performing bulk insert operations should give a considerable speedup. Using ALTER TABLE ... DISABLE KEYS requires the INDEX privilege in addition to the privileges mentioned earlier. While the non-unique indexes are disabled, they are ignored for statements such as SELECT and EXPLAIN that otherwise would use them.
You could try wrapping the whole thing up in a single transaction - that would probably make quite a difference:
class Worker
def self.perform(file)
ObjectA.transaction do
CsvParser.each(file) do |line|
next unless check_line_with_redis(line)
a = ObjectA.find_or_initialize_by_reference(line[:reference])
a.object_bs.destroy_all
a.update_attributes(line)
end
end
end
end
Otherwise every save will be wrapped in it's own transaction. Although, for a 10GB file you probably want to break it into hunks, of say 1000 inserts per transaction or something.
精彩评论