MapReduce on more than one datastore kind in Google App Engine
I just watched Batch data processing with App Engine session of Google I/O 2010, read some parts of MapReduce article from Google Research and now I am thinking to use MapReduce on Google App Engine to implement a recommender system in Python.
I prefer using appengine-mapreduce instead of Task Queue API because the former offers easy iteration over all instances of some kind, automatic batching, automatic task chaining, etc. The problem is: my recommender system needs to calculate correlation between instances of two different Models, i.e., instances of 开发者_运维百科two distinct kinds.
Example:
I have these two Models: User and Item. Each one has a list of tags as an attribute. Below are the functions to calculate correlation between users and items. Note that calculateCorrelation
should be called for every combination of users and items:
def calculateCorrelation(user, item):
return calculateCorrelationAverage(u.tags, i.tags)
def calculateCorrelationAverage(tags1, tags2):
correlationSum = 0.0
for (tag1, tag2) in allCombinations(tags1, tags2):
correlationSum += correlation(tag1, tag2)
return correlationSum / (len(tags1) + len(tags2))
def allCombinations(list1, list2):
combinations = []
for x in list1:
for y in list2:
combinations.append((x, y))
return combinations
But that calculateCorrelation
is not a valid Mapper in appengine-mapreduce and maybe this function is not even compatible with MapReduce computation concept. Yet, I need to be sure... it would be really great for me having those appengine-mapreduce advantages like automatic batching and task chaining.
Is there any solution for that?
Should I define my own InputReader? A new InputReader that reads all instances of two different kinds is compatible with the current appengine-mapreduce implementation?
Or should I try the following?
- Combine all keys of all entities of these two kinds, two by two, into instances of a new Model (possibly using MapReduce)
- Iterate using mappers over instances of this new Model
- For each instance, use keys inside it to get the two entities of different kinds and calculate the correlation between them.
Following Nick Johnson suggestion, I wrote my own InputReader. This reader fetch entities from two different kinds. It yields tuples with all combinations of these entities. Here it is:
class TwoKindsInputReader(InputReader):
_APP_PARAM = "_app"
_KIND1_PARAM = "kind1"
_KIND2_PARAM = "kind2"
MAPPER_PARAMS = "mapper_params"
def __init__(self, reader1, reader2):
self._reader1 = reader1
self._reader2 = reader2
def __iter__(self):
for u in self._reader1:
for e in self._reader2:
yield (u, e)
@classmethod
def from_json(cls, input_shard_state):
reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])
return cls(reader1, reader2)
def to_json(self):
json_dict = {}
json_dict[self._KIND1_PARAM] = self._reader1.to_json()
json_dict[self._KIND2_PARAM] = self._reader2.to_json()
return json_dict
@classmethod
def split_input(cls, mapper_spec):
params = mapper_spec.params
app = params.get(cls._APP_PARAM)
kind1 = params.get(cls._KIND1_PARAM)
kind2 = params.get(cls._KIND2_PARAM)
shard_count = mapper_spec.shard_count
shard_count_sqrt = int(math.sqrt(shard_count))
splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
inputs = []
for u in splitted1:
for e in splitted2:
inputs.append(TwoKindsInputReader(u, e))
#mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
return inputs
@classmethod
def validate(cls, mapper_spec):
return True #TODO
This code should be used when you need to process all combinations of entities of two kinds. You can also generalize this for more than two kinds.
Here it is a valid the mapreduce.yaml for TwoKindsInputReader
:
mapreduce:
- name: recommendationMapReduce
mapper:
input_reader: customInputReaders.TwoKindsInputReader
handler: recommendation.calculateCorrelationHandler
params:
- name: kind1
default: kinds.User
- name: kind2
default: kinds.Item
- name: shard_count
default: 16
It's difficult to know what to recommend without more details of what you're actually calculating. One simple option is to simply fetch the related entity inside the map call - there's nothing preventing you from doing datastore operations there.
This will result in a lot of small calls, though. Writing a custom InputReader, as you suggest, will allow you to fetch both sets of entities in parallel, which will significantly improve performance.
If you give more details as to how you need to join these entities, we may be able to provide more concrete suggestions.
精彩评论