开发者

How to speed up the processing of a large CSV using ruby

For a project I need to parse some pretty large CSV files. The contents of some of the entries is stored in a MySQL database. I am trying to speed this up using multithreading, but up to now this only slows things down.

I parse a CSV file (up to 10GB), and some of these records (aprox. 5M out of a 20M+ record CSV) need to be inserted into a MySQL database. To determine which record needs to be inserted we use a Redis server with sets that contain the correct id's / references.

Since we process around 30 of these files at any given time, and there are some dependencies, we store each file on a Resque queue and have multiple servers handling these (prioritized) queues.

In a nutshell:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      a = ObjectA.find_or_initialize_by_reference(line[:reference])
      a.object_bs.destroy_all
      a.update_attributes(line)
    end
  end

This works, scales nice horizontally (more CSV files = more servers), but larger CSV files pose a problem. We currently have files that take over 75 hours to parse this way. There are a number of optimizations I thought of already:

One is cutting down on the MySQL queries; we instantiate AR objects while an insert with plain SQL, if we know the objects Id, is much faster. This way we can probably get rid of most of AR and maybe even Rails to remove overhead this way. We can't use a plain MySQL load data since we have to map the CSVs records to other entities that might have different Ids by now (we combine a dozen legacy databases into a new database).

The other is trying to do more in the same time. There is some IO wait time, network wait time for both Redis and MySQL, and even while MRI uses green threads this might allow us to schedule our MySQL queries at开发者_运维百科 the same time as the IO reads etc. But using the following code:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      create_or_join_thread(line) do |myLine|
        a = ObjectA.find_or_initialize_by_reference(myLine[:reference])
        a.object_bs.destroy_all
        a.update_attributes(myLine)
      end
    end

    def self.create_or_join_thread(line)
      @thread.join if @thread.present?
      @thread = Thread.new(line) do |myLine|
        yield myLine
      end
    end
  end

This slowly slows down the process. When I ps au it starts off at 100% CPU, but as time progresses it goes down to just 2-3%. At that moment it does not insert new records at all, it just appears to hang.

I have straced the process, and at first I see MySQL queries pass by, after a while it appears it is not executing my ruby code at all. Could be a deadlock (it hung after parsing the last line of the CSV, but the process kept on running at 5% CPU and did not quit), or something I read here: http://timetobleed.com/ruby-threading-bugfix-small-fix-goes-a-long-way/

I am using Rails 2.3.8, REE, 1.8.7-2010.02 on Ubuntu 10.10. Any insights on how to handle large numbers of threads (or maybe why not to use threads here at all) is greatly appreciated!


Do you have any indexes on these tables?

Could you temporarily disable these indexes during your bulk inserts?

Before we do bulk inserts we disable index keys:

ALTER TABLE foo DISABLE KEYS

After we're finished, we enable index keys:

ALTER TABLE foo ENABLE KEYS

From the docs:

ALTER TABLE ... DISABLE KEYS tells MySQL to stop updating non-unique indexes. ALTER TABLE ... ENABLE KEYS then should be used to re-create missing indexes. MySQL does this with a special algorithm that is much faster than inserting keys one by one, so disabling keys before performing bulk insert operations should give a considerable speedup. Using ALTER TABLE ... DISABLE KEYS requires the INDEX privilege in addition to the privileges mentioned earlier. While the non-unique indexes are disabled, they are ignored for statements such as SELECT and EXPLAIN that otherwise would use them.


You could try wrapping the whole thing up in a single transaction - that would probably make quite a difference:

class Worker
  def self.perform(file)
    ObjectA.transaction do 
      CsvParser.each(file) do |line|
        next unless check_line_with_redis(line)
        a = ObjectA.find_or_initialize_by_reference(line[:reference])
        a.object_bs.destroy_all
        a.update_attributes(line)
      end
    end
  end
end

Otherwise every save will be wrapped in it's own transaction. Although, for a 10GB file you probably want to break it into hunks, of say 1000 inserts per transaction or something.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜