How Can I Load Every File In a Folder Using PIG?
I have a folder of files created daily that all store the same type of information. I'd like to make a script that loads the newest 10 of them, UNIONs them, and then runs some other code on them. S开发者_StackOverflow中文版ince pig already has an ls method, I was wondering if there was a simple way for me to get the last 10 created files, and load them all under generic names using the same loader and options. I'm guessing it would look something like:
REGISTER /usr/local/lib/hadoop/hadoop-lzo-0.4.13.jar;
REGISTER /usr/local/lib/hadoop/elephant-bird-2.0.5.jar;
FOREACH file in some_path:
file = LOAD 'file'
USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
AS (i1, i2, i3);
This is not something I've been able to do out of the box, and is something that can be done outside of the script with some sort of wrapper script or helper script (bash, perl, etc.). If you write a script, called last10.sh
, that would output your last 10 files, comma separated:
$ ./last10.sh
/input/file38,/input/file39,...,/input/file48
Something like this should do the trick for the most recent 10 files:
hadoop fs -ls /input/ | sort -k6,7 | tail -n10 | awk '{print $8}' | tr '\n' ','
you could do:
$ pig -p files="`last10.sh`" my_mr.pig
Then, in your pig script, do:
data = LOAD '$files'
USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
AS (i1, i2, i3);
Pig loads up the separate files if they are comma separated like this. This would be equivalent to doing:
data = LOAD '/input/file38,/input/file39,...,/input/file48'
USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
AS (i1, i2, i3);
Donald Miner's answer still works perfectly well, but IMO there's a better approach to this now using Embedded Pig in Python. O'Reilly has a brief explanation here. There's also a presentation on why this is something you'd want to do, and how it works here. Long story short, there's a lot of functionality it would be nice to have access to before running a pig script to determine parts of the script. Wrapping and/or dynamically generating parts of the script in Jython let's you do that. Rejoice!
I like above 2 approaches. Just wanted to give one more option for oozie enthusiasts. Java action in oozie spits out a file in location configured by "oozie.action.output.properties" and Pig action takes it that passes to pig script. This is definitely not elegant solution compared to above 2. I have had trouble configuring embedded pig using java schedule in oozie so I had to go with this solution.
<workflow-app xmlns='uri:oozie:workflow:0.1' name='java-wf'>
<start to='java1' />
<action name='java1'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>org.apache.oozie.test.MyTest</main-class>
<arg>${outputFileName}</arg>
<capture-output/>
</java>
<ok to="pig1" />
<error to="fail" />
</action>
<action name='pig1'>
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<script>script.pig</script>
<param>MY_VAR=${wf:actionData('java1')['PASS_ME']}</param>
</pig>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
精彩评论