MapReduce pairwise comparison of all lines in multiple files
I'm getting started with using python's mrjob to convert some of my long running python programs into MapReduce hadoop jobs. I've gotten the simple word count examples to work and I conceptually understand the 'text-classification' example.
However, I'm having a little trouble figuring out the steps I need to do to get my problem working.
I have multiple files (about 6000) each of which have 2 to 800 lines each. In this case each line is a simple space-delimited 'signal'. I need to compare correlation between each line in each file and EVERY other line in ALL files (including itself). Then based on the correlation coefficient I'll output the results.
An example of one file:
1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...
I need to yield each LINE of this file paired with EVERY OTHER LINE from every other file ... or I could concatenate all files into one file if that makes things easier, but I would still need the pairwise iteration.
I understand how to do the calculation and how to use the final reduce step to aggregate and filter the results. The difficulty I'm having is how to I yield
all pairwise items to successive steps without reading all fi开发者_Go百科les in a single setp? I guess I could prepare an input file ahead of time which uses itertools.product
but this file would be prohibitively large.
Well, since nobody has come up with an answer I'll post my current work-around in-case anybody else out there needs it. I'm not sure how 'canocical' or efficient this is but its worked so far.
I put the filename as the first item of each line of the file followed by a \t
followed by the rest of the data. For this example I'm just using a single number on each line and then averaging them, just as a very trivial example.
Then I made the following map-reduce step in mrjob
.
class MRAvgPairwiseLines(MRJob):
def input_mapper(self, _, value):
"""Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""
fnum, val = value.split('\t')
yield 'ALL', (fnum, val)
def input_reducer(self, key, values):
for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
yield fnum1, (fnum1, fnum2, val1, val2)
def do_avg(self, key, value):
fnum1, fnum2, val1, val2 = value
res = (float(val1)+float(val2))/float(2)
yield key, (fnum2, res)
def get_max_avg(self, key, values):
max_fnum, max_avg = max(values, key = lambda x: x[1])
yield key, (max_fnum, max_avg)
def steps(self):
return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]
This way all of the output from the input_mapper
function gets grouped to the same input_reducer
which then yield
s successive pairs. These then pass through to the proper places to finally return the largest average (which is actually the largest item in all other files).
Hope that helps someone.
精彩评论