Hadoop Distributed Cache (Cloudera CH3)
I am trying to run a simple example using a binary executable and the cached archive and it does not seem to be working:
开发者_运维问答The example I am trying to run has a mapper which generates three random doubles and a key and the reducer will average those three numbers together and log the average. Very simple stuff. I wrote a simple EXE in c do generate the random numbers:
#include <cstdio>
#include <stdlib.h>
#include <time.h>
int main(int argc, char*argv[]) {
srand ( time(NULL) );
int rand1 = rand() % 10 + 1;
int rand2 = rand() % 10 + 1;
int rand3 = rand() % 10 + 1;
printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
return 0;
}
so if i call ./a.out [key]
i will see
key, random1, random2, random3
im using python streaming, and here is my mapper written in python:
#!/usr/bin/python
import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE
from misc import *
for line in sys.stdin:
line = line.strip()
sline = line.split(',')
# parse out the au and cost from the line ....
key = int( sline[0] )
au = int( sline[1])
cost = float( sline[2] )
command = "./a.out %d" % ( key )
cli_parts = shlex.split(command)
mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
print mp.communicate()[0].strip('\r\n')
here is the reducer that will just do the averaging:
#!/usr/bin/python
import os
import sys
import math
import re
from misc import *
for line in sys.stdin:
line = line.strip()
m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
if m:
average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
else:
print "not found"
#f.close()
so after reading the documentation, it seems like i need to compile the binary and the tar.gz-it
1) tar cvaf a.out.tar.gz a.out
now I should be able to pass this to the datanodes via the - cacheArchive parameter and everything should work fine. Here is my Hadoop command:
hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop- streaming-0.20.2+737.jar \ -numReduceTasks 1 \ -mapper mapper1.py \ -file mapper1.py \ -reducer reducer1.py \ -file reducer1.py \ -file misc.py \ -cacheArchive a.out.tar.gz \ -input input/* \ -output testsvmoutput \ -verbose
Needless to say, this does not work, and it seems its because the mapper is not generating data.
I confirmed my code works by testing it on the command line:
cat input/svminput1.txt | python mapper1.py | sort | python reducer1.py
I would love for someone to explain why this is not working, how passing an exe via the cacheArchive command works on the datanodes, and/or how to debug this because the error messages coming out of the Cloudera html panel are not that helpful.
Thanks
Here is the error I am seeing:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
at org.apache.hadoop.mapred.Child.main(Child.java:211)
I see a few things you are doing wrong. You want to set your python scripts chmod a+x and test like this cat input/svminput1.txt | ./mapper1.py | sort | ./reducer1.py because that is basically what Hadoop does in streaming is launch the script (the OS handles executing the script with the right interpreter)
Now for the other files moving into the job for use with your mapper & reducer you just add them in through the command line -file whateveryouwant (like you have with misc.py) and when your map/reduce launches those files are local "." to your script so import and use them or whatever you want (open a text file, whatever you want)... you should do this with the chacheArchive stuff also just push them each as -file should be fine.
Here is a very basic writeup to python streaming http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ if you have not seen already
and this a little more advanced python streaming with joins and keys http://allthingshadoop.com/2010/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/ that might be helpful also.
Hope this helps if not specific errors would be needed to-do anymore I think
Are you sure python is available at /usr/bin/python
on the cluster machines? A good practice is to always use #!/usr/bin/env python
at the top of your scripts... that way it's not hardcoded.
Also be sure to check the python installation on your cluster machines... make sure the imports work.
You aren't using any try/excepts in your code so it will be very difficult to debug what the problem is... I suggest try/excepting your code and printing log messages to a well-known location such as /tmp
....
For more you can check out davidvhill.com/articles
.... my actual production code is captured here....
精彩评论