开发者

Hadoop Distributed Cache (Cloudera CH3)

I am trying to run a simple example using a binary executable and the cached archive and it does not seem to be working:

开发者_运维问答

The example I am trying to run has a mapper which generates three random doubles and a key and the reducer will average those three numbers together and log the average. Very simple stuff. I wrote a simple EXE in c do generate the random numbers:

#include <cstdio>
#include <stdlib.h>
#include <time.h> 

int main(int argc, char*argv[]) {
    srand ( time(NULL) );
    int rand1 = rand() % 10 + 1;
    int rand2 = rand() % 10 + 1;
    int rand3 = rand() % 10 + 1;
    printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
    return 0; 
}

so if i call ./a.out [key]

i will see

key, random1, random2, random3

im using python streaming, and here is my mapper written in python:


#!/usr/bin/python

import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE

from misc import *

for line in sys.stdin:
    line = line.strip()
    sline = line.split(',')

    # parse out the au and cost from the line ....
    key = int( sline[0] )
    au = int( sline[1])
    cost = float( sline[2] )

    command = "./a.out %d" % ( key )
    cli_parts = shlex.split(command)
    mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
    print mp.communicate()[0].strip('\r\n')

here is the reducer that will just do the averaging:


#!/usr/bin/python

import os
import sys
import math
import re

from misc import *

for line in sys.stdin:
    line = line.strip()
    m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
    if m:
        average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
        print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
    else:
        print "not found"

#f.close()

so after reading the documentation, it seems like i need to compile the binary and the tar.gz-it

1) tar cvaf a.out.tar.gz a.out

now I should be able to pass this to the datanodes via the - cacheArchive parameter and everything should work fine. Here is my Hadoop command:

hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop- streaming-0.20.2+737.jar \ -numReduceTasks 1 \ -mapper mapper1.py \ -file mapper1.py \ -reducer reducer1.py \ -file reducer1.py \ -file misc.py \ -cacheArchive a.out.tar.gz \ -input input/* \ -output testsvmoutput \ -verbose

Needless to say, this does not work, and it seems its because the mapper is not generating data.

I confirmed my code works by testing it on the command line:

cat input/svminput1.txt | python mapper1.py | sort | python reducer1.py

I would love for someone to explain why this is not working, how passing an exe via the cacheArchive command works on the datanodes, and/or how to debug this because the error messages coming out of the Cloudera html panel are not that helpful.

Thanks

Here is the error I am seeing:

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
    at org.apache.hadoop.mapred.Child.main(Child.java:211)


I see a few things you are doing wrong. You want to set your python scripts chmod a+x and test like this cat input/svminput1.txt | ./mapper1.py | sort | ./reducer1.py because that is basically what Hadoop does in streaming is launch the script (the OS handles executing the script with the right interpreter)

Now for the other files moving into the job for use with your mapper & reducer you just add them in through the command line -file whateveryouwant (like you have with misc.py) and when your map/reduce launches those files are local "." to your script so import and use them or whatever you want (open a text file, whatever you want)... you should do this with the chacheArchive stuff also just push them each as -file should be fine.

Here is a very basic writeup to python streaming http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ if you have not seen already

and this a little more advanced python streaming with joins and keys http://allthingshadoop.com/2010/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/ that might be helpful also.

Hope this helps if not specific errors would be needed to-do anymore I think


Are you sure python is available at /usr/bin/python on the cluster machines? A good practice is to always use #!/usr/bin/env python at the top of your scripts... that way it's not hardcoded.

Also be sure to check the python installation on your cluster machines... make sure the imports work.

You aren't using any try/excepts in your code so it will be very difficult to debug what the problem is... I suggest try/excepting your code and printing log messages to a well-known location such as /tmp....

For more you can check out davidvhill.com/articles.... my actual production code is captured here....

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜