开发者

Bad Performance for Dedupe of 2 million records using mapreduce on Appengine

I have about 2 million records which have about 4 string fields each which needs to be checked for duplicates. To be more specific I have name, phone, address and fathername as fields and I must check for dedupe using all these fields with rest of data. The resulting unique records need to be noted into db.

I have been able to implement mapreduce, iterarate of all records. Task rate is set to 100/s and bucket-size to 100. Billing enabled.

Currently, everything is working, but performance is very very slow. I have been able to complete only 1000 records dedupe processing among a test dataset of 10,000 records in 6 hours.

The current design in java is:

  1. In every map iteration,开发者_运维知识库 I compare the current record with the previous record
  2. Previous record is a single record in db which acts like a global variable which I overwrite with another previous record in each map iteration
  3. Comparison is done using an algorithm and result is written as a new entity to db
  4. At the end of one Mapreduce job, i programatically create another job
  5. The previous record variable helps the job to compare with next candidate record with rest of the data

I am ready to increase any amount of GAE resources to achieve this in shortest time.

My Questions are:

  1. Will the accuracy of dedupe (checking for duplicates) affect due to parallel jobs/tasks?
  2. How can this design be improved?
  3. Will this scale to 20 million records
  4. Whats the fastest way to read/write variables (not just counters) during map iteration which can be used across one mapreduce job.

Freelancers most welcome to assist in this.

Thanks for your help.


You should take advantage of the Reducer to do the equivalent of a sort -u for each field. You'll need to do one M/R job per field. You would make the field you are comparing the key in the mapper, then in the reducer you'd get all of the records with the same name grouped together and you could mark them. The second pass would be for the phone, etc. Depending on your cluster size each pass should be very fast.

Edit: @Olaf pointed out the OP probably wants totally unique records. Using a multipart key this could be a one-line hadoop streaming command to get the unique set. I'll add that soon.

Edit2: Promised streaming command that will perform a sort -u on the entire file. This assumes you have a file with the records with each field (name, fathername, phone number and address) one per line tab delimited in one or more files in the dir hdfs://example/dedup/input/. The actual hdfs path can be anything, or you could use a single file. The output will be multiple part-* files in hdfs://example/dedup/output/. You also might need to change the command as your hadoop-streaming.jar might be in a slightly different place. If you have more than 4 fields change the value of stream.num.map.output.key.fields.

   $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input hdfs://example/dedup/input/ -output hdfs://example/dedup/output/ \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /usr/bin/uniq \
-D stream.num.map.output.key.fields=4

To retrieve the unique results to a file in the local filesystem file run this:

    $HADOOP_HOME/bin/hadoop fs -cat \
 'hdfs://example/dedup/output/part-*' > results.txt

One note is that as every column is a key streaming will add a null value so each row will have an extra tab at the end. That is easily stripped off.

If you want to do more than just get the uniq output you could put your own java class or command line program rather than use /usr/bin/uniq. That class could, for example, update all records that you find are duplicated by adding a fifth column in your input that is the records DB ID. Hadoop by default partitions results by the whole key so each group of duplicate records will be streamed together a reducer, and this will all happen in parallel. Please take a look at the streaming documentation for more info.


I see 2 ways to approach this problem:

  1. (If you only need to do it once) AppEngine creates a property index for every property in your entity (unless you ask it not to do that). Create a backend, run a query "SELECT * FROM ORDER BY " in batches using cursors, determine duplicated properties and fix/delete those. You might be able to parallelize this, but it's tricky on shard boundaries and you will probably have to write all the code yourself.

  2. You can use mapper framework to do it slower, but run in parallel. This approach also allows you to efficiently dedupe data on insert. Introduce a new entity to hold unique property values. Say "UniquePhoneNumber". The entity should hold a phone number as a key and a reference to the entity with this phone number. Now run a map and do a lookup for UniquePhoneNumber. If it's found and its reference is valid, delete the duplicate. If not create a new one with correct reference. This way it's even possible to repoint a reference to the other one, if you need to. Make sure that you read UniquePhoneNumber and create a new one/update a new one inside a single transaction. Otherwise duplicates won't be detected.


Generate a hash code for each record. Loop through your records and insert each one into a Set based on the hash code. The Set is now a deduped list in O(N).


You definitely shouldn't be using your current approach - only one process can update an entity at one time, so your entire mapreduce is bottlenecked on that one entity. Further, mapreduce doesn't currently allow you to specify the ordering of a result set, so you've got no guarantee you'll find all (or even most) duplicates.

For now, your best option is probably to build your own solution. Using cursors, perform a query on the kind sorted by the field you want to deduplicate, and scan over it, checking for duplicates and removing them (in batches, to reduce RPCs) as you encounter them. When you need to chain another task (due to the 10 minute task limit), use a cursor to ensure the new task picks up where you left off.

If you want to parallelize this, you can, by having each shard start by skipping over records until it discovers a change in the value you're deduplicating on, and starting from there. At the end of a shard, wait until you reach the end of a group before stopping. This way, you make sure you don't miss removing duplicates positioned on the edge of a shard boundary.


Here is a solution based on hashed self join with Map Reduce. it can also do fuzzy duplicate matching with edit distance algorithm. You can choose the fields from the record that you want to use for duplicate detection. The reducer will output a duplicate score.

https://pkghosh.wordpress.com/2013/09/09/identifying-duplicate-records-with-fuzzy-matching/

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜