hadoop-streaming : writing output to different files
Here is the scenario
Reducer1
/
Mapper - - Reducer2
\
ReducerN
In reducer I want to write the data on different files, lets say the reducer looks like
def reduce():
for line in sys.STDIN:
if(line == type1):
create开发者_如何学JAVA_type_1_file(line)
if(line == type2):
create_type_2_file(line)
if(line == type3):
create_type3_file(line)
... and so on
def create_type_1_file(line):
# writes to file1
def create_type2_file(line):
# writes to file2
def create_type_3_file(line):
# write to file 3
Consider the paths to write as :
file1 = /home/user/data/file1
file2 = /home/user/data/file2
file3 = /home/user/data/file3
When I run in pseudo-distributed mode(machine with one node and hdfs daemons running)
, things are good since all daemons will write to the same set of files
Question:
- If I run this in cluster of 1000 machines, will they write to the same set of files even then? I am writing to local filesystem
in this case, Is there a better way to perform this operation in hadoop streaming
?
Typically the o/p of reduce is written to a reliable storage system like HDFS, because if one of the nodes goes down then the reduce data associated with that nodes is lost. It's not possible to run that particular reduce task again outside the context of the Hadoop framework. Also, once the job is complete, the o/p from the 1000 nodes have to be consolidated for the different input types.
Concurrent writing is not supported in HDFS. There might be a case where multiple reducers might be writing to the same file in HDFS and this might corrupt the file. When multiple reduce tasks are running on a single node, concurrency might be a problem when writing to a single local file also.
One of the solution is to have a reduce task specific file name and later combine all the files for a specific input type.
Output can be written from the Reducer to more than one location using MultipleOutputs class.You can regard file1,file2 and file3 as three folders and write 1000 Reducers' output data to these folders seperately.
Usage pattern for job submission:
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
//outDir is the root path, in this case, outDir="/home/user/data/"
FileOutputFormat.setOutputPath(job, outDir);
//You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
job.waitForCompletion(true);
Usage in Reducer:
private MultipleOutputs out;
public void setup(Context context) {
out = new MultipleOutputs(context);
...
}
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary.
for (Text line : values) {
if(line == type1)
out.write(key, new Text(line),"file1/part");
else if(line == type2)
out.write(key, new Text(line),"file2/part");
else if(line == type3)
out.write(key, new Text(line),"file3/part");
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
out.close();
}
ref:https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html
精彩评论