Simple counter example using mapreduce in Google App Engine
I'm somewhat confused with the current state of mapreduce support in GAE. According to the docs http://code.google.com/p/appengine-mapreduce/ reduce phase isn't supported yet, but in the description of the session from I/O 2011 ( http://www.youtube.com/watch?v=EIxelKcyCC0 ) it's written "It is now possible to run full Map Reduce jobs on App Engine". I wonder if I can use mapreduce in this task:
What I want to do:
I have model Car with field color:
class Car(db.Model):
color = db.StringProperty()
I want to run mapreduce process (from time to time开发者_开发问答, cron-defined) which can compute how many cars are in each color ans store this result in the datastore. Seems like a job well suited for mapreduce (but if I'm wrong correct me), phase "map" will yield pairs (, 1) for each Car entity, and phase "reduce" should merge this data by color_name giving me expected results. Final result I want to get are entities with computed data stored in the datastore, something like that:
class CarsByColor(db.Model):
color_name = db.StringProperty()
cars_num = db.IntegerProperty()
Problem: I don't know how to implement this in appengine ... The video shows examples with defined map and reduce functions, but they seem to be very general examples not related to the datastore. All other examples that i found are using one function to process the data from DatastoreInputReader, but they seem to be only the "map" phase, there is no example of how to do the "reduce" (and how to store reduce results in the datastore).
You don't really need a reduce phase. You can accomplish this with a linear task chain, more or less as follows:
def count_colors(limit=100, totals={}, cursor=None):
query = Car.all()
if cursor:
query.with_cursor(cursor)
cars = query.fetch(limit)
for car in cars:
try:
totals[car.color] += 1
except KeyError:
totals[car.color] = 1
if len(cars) == limit:
cursor = query.cursor()
return deferred.defer(count_colors, limit, totals, cursor)
entities = []
for color in totals:
entity = CarsByColor(key_name=color)
entity.cars_num = totals[color]
entities.append(entity)
db.put(entities)
deferred.defer(count_colors)
This should iterate over all your cars, pass a query cursor and a running tally to a series of ad-hoc tasks, and store the totals at the end.
A reduce phase might make sense if you had to merge data from multiple datastores, multiple models, or multiple indexes in a single model. As is I don't think it would buy you anything.
Another option: use the task queue to maintain live counters for each color. When you create a car, kick off a task to increment the total for that color. When you update a car, kick off one task to decrement the old color and another to increment the new color. Update counters transactionally to avoid race conditions.
I'm providing here solution I figured out eventually using mapreduce from GAE (without reduce phase). If I had started from scratch I probably would have used solution provided by Drew Sears.
It works in GAE python 1.5.0
In app.yaml I added the handler for mapreduce:
- url: /mapreduce(/.*)?
script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py
and the handler for my code for mapreduce (I'm using url /mapred_update to gather the results produced by mapreduce):
- url: /mapred_.*
script: mapred.py
Created mapreduce.yaml for processing Car entities:
mapreduce:
- name: Color_Counter
params:
- name: done_callback
value: /mapred_update
mapper:
input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
handler: mapred.process
params:
- name: entity_kind
default: models.Car
Explanation: done_callback is an url that is called after mapreduce finishes its operations. mapred.process is a function that process individual entity and update counters (it's defined in mapred.py file). Model Car is defined in models.py
mapred.py:
from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
def process(entity):
"""Process individual Car"""
color = entity.color
if color:
yield op.counters.Increment('car_color_%s' % color)
class UpdateCounters(webapp.RequestHandler):
"""Create stats models CarsByColor based on the data
gathered by mapreduce counters"""
def post(self):
"""Called after mapreduce operation are finished"""
# Finished mapreduce job id is passed in request headers
job_id = self.request.headers['Mapreduce-Id']
state = MapreduceState.get_by_job_id(job_id)
to_put = []
counters = state.counters_map.counters
# Remove counter not needed for stats
del counters['mapper_calls']
for counter in counters.keys():
stat = CarsByColor.get_by_key_name(counter)
if not stat:
stat = CarsByColor(key_name=counter,
name=counter)
stat.value = counters[counter]
to_put.append(stat)
db.put(to_put)
self.response.headers['Content-Type'] = 'text/plain'
self.response.out.write('Updated.')
application = webapp.WSGIApplication(
[('/mapred_update', UpdateCounters)],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
There is slightly changed definition of CarsByColor model compared to question.
You can start the mapreduce job manually from url: http://yourapp/mapreduce/ and hopefully from cron (I haven't tested the cron yet).
精彩评论