Should I learn/use MapReduce, or some other type of parallelization for this task?
After talking with a friend of mine from Google, I'd like to implement some kind of Job/Worker model for updating my dataset.
This dataset mirrors a 3rd party service's data, so, to do the update, I need to make several remote calls to their API. I think a lot of time will be spent waiting for responses from this 3rd party service. I'd like to speed things up, and make better use of my compute hours, by parallelizing these requests and keeping many of them open at once, as they wait for their individual responses.
Before I explain my specific dataset and get into the problem, I'd like to clarify what answers I'm looking for:
- Is this a flow that would be well suited to parallelizing wit开发者_如何学编程h MapReduce?
- If yes, would this be cost effective to run on Amazon's mapreduce module, which bills by the hour, and rounds hour's up when the job is complete? (I'm not sure exactly what counts as a "Job", so I don't know exactly how I'll be billed)
- If no, Is there another system/pattern I should use? and Is there a library that will help me do this in python (On AWS, usign EC2 + EBS)?
- Are there any problems you see with the way I've designed this job flow?
Ok, now onto the details:
The dataset consists of users who have favorite items and who follow other users. The aim is to be able to update each user's queue -- the list of items the user will see when they load the page, based on the favorite items of the users she follows. But, before I can crunch the data and update a user's queue, I need to make sure I have the most up-to-date data, which is where the API calls come in.
There are two calls I can make:
- Get Followed Users -- Which returns all the users being followed by the requested user, and
- Get Favorite Items -- Which returns all the favorite items of the requested user.
After I call get followed users for the user being updated, I need to update the favorite items for each user being followed. Only when all of the favorites are returned for all the users being followed can I start processing the queue for that original user. This flow looks like:
Jobs in this flow include:
- Start Updating Queue for user -- kicks off the process by fetching the users followed by the user being updated, storing them, and then creating Get Favorites jobs for each user.
- Get Favorites for user -- Requests, and stores, a list of favorites for the specified user, from the 3rd party service.
- Calculate New Queue for user -- Processes a new queue, now that all the data has been fetched, and then stores the results in a cache which is used by the application layer.
So, again, my questions are:
- Is this a flow that would be well suited to parallelizing with MapReduce? I don't know if it would let me start the process for UserX, fetch all the related data, and come back to processing UserX's queue only after that's all done.
- If yes, would this be cost effective to run on Amazon's mapreduce module, which bills by the hour, and rounds hour's up when the job is complete? Is there a limit on how many "threads" I can have waiting on open API requests if I use their module?
- If no, Is there another system/pattern I should use? and Is there a library that will help me do this in python (On AWS, usign EC2 + EBS?)?
- Are there any problems you see with the way I've designed this job flow?
Thanks for reading, I'm looking forward to some discussion with you all.
Edit, in response to JimR:
Thanks for a solid reply. In my reading since I wrote the original question, I've leaned away from using MapReduce. I haven't decided for sure yet how I want to build this, but I'm beginning to feel MapReduce is better for distributing / parallelizing computing load when I'm really just looking to parallelize HTTP requests.
What would have been my "reduce" task, the part that takes all the fetched data and crunches it into results, isn't that computationally intensive. I'm pretty sure it's going to wind up being one big SQL query that executes for a second or two per user.
So, what I'm leaning towards is:
- A non-MapReduce Job/Worker model, written in Python. A google friend of mine turned me onto learning Python for this, since it's low overhead and scales well.
- Using Amazon EC2 as a compute layer. I think this means I also need an EBS slice to store my database.
- Possibly using Amazon's Simple Message queue thingy. It sounds like this 3rd amazon widget is designed to keep track of job queues, move results from one task into the inputs of another and gracefully handle failed tasks. It's very cheap. May be worth implementing instead of a custom job-queue system.
The work you describe is probably a good fit for either a queue, or a combination of a queue and job server. It certainly could work as a set of MapReduce steps as well.
For a job server, I recommend looking at Gearman. The documentation isn't awesome, but the presentations do a great job documenting it, and the Python module is fairly self-explanatory too.
Basically, you create functions in the job server, and these functions get called by clients via an API. The functions can be called either synchronously or asynchronously. In your example, you probably want to asynchronously add the "Start update" job. That will do whatever preparatory tasks, and then asynchronously call the "Get followed users" job. That job will fetch the users, and then call the "Update followed users" job. That will submit all the "Get Favourites for UserA" and friend jobs together in one go, and synchronously wait for the result of all of them. When they have all returned, it will call the "Calculate new queue" job.
This job-server-only approach will initially be a bit less robust, since ensuring that you handle errors and any down servers and persistence properly is going to be fun.
For a queue, SQS is an obvious choice. It is rock-solid, and very quick to access from EC2, and cheap. And way easier to set up and maintain than other queues when you're just getting started.
Basically, you will put a message onto the queue, much like you would submit a job to the job server above, except you probably won't do anything synchronously. Instead of making the "Get Favourites For UserA" and so forth calls synchronously, you will make them asynchronously, and then have a message that says to check whether all of them are finished. You'll need some sort of persistence (a SQL database you're familiar with, or Amazon's SimpleDB if you want to go fully AWS) to track whether the work is done - you can't check on the progress of a job in SQS (although you can in other queues). The message that checks whether they are all finished will do the check - if they're not all finished, don't do anything, and then the message will be retried in a few minutes (based on the visibility_timeout). Otherwise, you can put the next message on the queue.
This queue-only approach should be robust, assuming you don't consume queue messages by mistake without doing the work. Making a mistake like that is hard to do with SQS - you really have to try. Don't use auto-consuming queues or protocols - if you error out, you might not be able to ensure that you put a replacement message back on the queue.
A combination of queue and job server may be useful in this case. You can get away with not having a persistence store to check job progress - the job server will allow you to track job progress. Your "get favourites for users" message could place all the "get favourites for UserA/B/C" jobs into the job server. Then, put a "check all favourites fetching done" message on the queue with a list of tasks that need to be complete (and enough information to restart any jobs that mysteriously disappear).
For bonus points:
Doing this as a MapReduce should be fairly easy.
Your first job's input will be a list of all your users. The map will take each user, get the followed users, and output lines for each user and their followed user:
"UserX" "UserA"
"UserX" "UserB"
"UserX" "UserC"
An identity reduce step will leave this unchanged. This will form the second job's input. The map for the second job will get the favourites for each line (you may want to use memcached to prevent fetching favourites for UserX/UserA combo and UserY/UserA via the API), and output a line for each favourite:
"UserX" "UserA" "Favourite1"
"UserX" "UserA" "Favourite2"
"UserX" "UserA" "Favourite3"
"UserX" "UserB" "Favourite4"
The reduce step for this job will convert this to:
"UserX" [("UserA", "Favourite1"), ("UserA", "Favourite2"), ("UserA", "Favourite3"), ("UserB", "Favourite4")]
At this point, you might have another MapReduce job to update your database for each user with these values, or you might be able to use some of the Hadoop-related tools like Pig, Hive, and HBase to manage your database for you.
I'd recommend using Cloudera's Distribution for Hadoop's ec2 management commands to create and tear down your Hadoop cluster on EC2 (their AMIs have Python set up on them), and use something like Dumbo (on PyPI) to create your MapReduce jobs, since it allows you to test your MapReduce jobs on your local/dev machine without access to Hadoop.
Good luck!
Seems that we're going with Node.js and the Seq flow control library. It was very easy to move from my map/flowchart of the process to a stubb of the code, and now it's just a matter of filling out the code to hook into the right APIs.
Thanks for the answers, they were a lot of help finding the solution I was looking for.
I am working with a similar problem that i need to solve. I was also looking at MapReduce and using the Elastic MapReduce service from Amazon.
I'm pretty convinced MapReduce will work for this problem. The implementation is where I'm getting hung up, becauase I'm not sure my reducer even needs to do anything.
I'll answer your questions as I understand your (and my) problem, and hopefully it helps.
Yes I think it'll be suited well. You could look at leveraging the Elastic MapReduce service's multiple steps option. You could use 1 Step to fetch a the people a user is following, and another step to compile a list of tracks for each of those followers, and the reducer for that 2nd step would probably be the one to build the cache.
Depends on how big your data-set is and how often you'll be running it. It's hard to say without knowing how big the data-set is (or is going to get) if it'll be cost effective or not. Initially, it'll probably be quite cost-effective, as you won't have to manage your own hadoop cluster, nor have to pay for EC2 instances (assuming that's what you use) to be up all the time. Once you reach the point where you're actually crunching this data for a long period of time, it probably will make less and less sense to use Amazon's MapReduce service, because you'll constantly have nodes online all the time.
A job is basically your MapReduce task. It can consist of multiple steps (each MapReduce task is a step). Once your data has been processed and all steps have been completed, your Job is done. So you're effectively paying for CPU time for each node in the Hadoop cluster. so, T*n where T is the Time (in hours) it takes to process your data, and n is the number of nodes you tell Amazon to spin up.
I hope this helps, good luck. I'd like to hear how you end up implementing your Mappers and Reducers, as I'm solving a very similar problem and I'm not sure my approach is really the best.
精彩评论