Implementing a Custom Process Strategy with Apache Camel File Component
Problem Background
I am currently working on a camel based ETL application that processes groups of files as they appear in a dated directory. The files need to be processed together as a group determined by the beginning of the file name. The files can only be processed once the done file (".flag") has been written to the directory. I know the camel file component has a done file option, but that only allows you to retrieve files with the same name as the done file. The application needs to run continuously and start polling the next day's directory when the date rolls.
Example Directory Structure:
/process-directory
/03-09-2011
/03-10-2011
/GROUPNAME_ID1_staticfilename.xml
/GROUPNAME_staticfilename2.xml
/GROUPNAME.flag
/GROUPNAME2_ID1_staticfilename.xml
/GROUPNAME2_staticfilename2.xml
/GROUPNAME2_staticfilename3.xml
/GROUPNAME2.flag
Attempts Thus Far
I have the following route (names obfuscated) that kicks off the processing:
@Override
public void configure() throws Exception
{
getContext().addEndpoint("processShare", createProcessShareEndpoint());
from("processShare")
.process(new InputFileRouter())
.choice()
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE1 + "'")
.to("seda://type1?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE2 + "'")
.to("seda://type2?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE3 + "'")
.to("seda://type3?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE4 + "'")
.to("seda://type4?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE5 + "'")
.to("seda://type5?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE6 + "'")
.to("seda://type6?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE7 + "'")
.to("seda://type7?size=1")
.otherwise()
.log(LoggingLevel.FATAL, "Unknown file type encountered during processing! --> ${body}");
}
My problems are around how to configure the file endpoint. I'm currently trying to programatically configure the endpoint without a lot of luck. My experience in camel thus far has been predominently using the Spring DSL and not the Java DSL.
I went down the route of trying to instantiate a FileEndpoint object, but whenever the route builds I get an error saying that the file property is null. I believe this is because I should be creating a FileComponent and not an endpoint. I'm not creating the endpoint without using a uri because I am not able to specify the dynamic date in the directory name using the uri.
private FileEndpoint createProcessShareEndpoint() throws ConfigurationException
{
FileEndpoint endpoint = new FileEndpoint();
//Custom directory "ready to process" implementation.
endpoint.setProcessStrategy(getContext().getRegistry().lookup(
"inputFileProcessStrategy", MyFileInputProcessStrategy.class));
try
{
//Controls the number of files returned per directory poll.
endpoint.setMaxMessagesPerPoll(Integer.parseInt(
PropertiesUtil.getProperty(
AdapterConstants.OUTDIR_MAXFILES, "1")));
}
catch (NumberFormatException e)
{
throw new ConfigurationException(String.format(
"Property %s is required to be an integer.",
AdapterConstants.OUTDIR_MAXFILES), e);
}
Map<String, Object> consumerPropertiesMap = new HashMap<String, Object>();
//Controls the delay between directory polls.
consumerPropertiesMap.put("delay", PropertiesUtil.getProperty(
AdapterConstants.OUTDIR_POLLING_MILLIS));
//Controls which files are included in directory polls.
//Regex that matches file extensions (eg. {SOME_FILE}.flag)
consumerPropertiesMap.put("include", "^.*(." + PropertiesUtil.getProperty(
AdapterConstants.OUTDIR_FLAGFILE_EXTENSION, "flag") + ")");
endpoint.setConsumerProperties(consumerPropertiesMap);
GenericFileConfiguration configuration = new GenericFileConfiguration();
//Controls the directory to be polled by the endpoint.
if(CommandLineOptions.getInstance().getInputDirectory() != null)
{
configuration.setDirectory(CommandLineOptions.getInstance().getInputDirectory());
}
else
{
SimpleDateFormat dateFormat = new SimpleDateFormat(PropertiesUtil.getProperty(AdapterConsta开发者_开发技巧nts.OUTDIR_DATE_FORMAT, "MM-dd-yyyy"));
configuration.setDirectory(
PropertiesUtil.getProperty(AdapterConstants.OUTDIR_ROOT) + "\\" +
dateFormat.format(new Date()));
}
endpoint.setConfiguration(configuration);
return endpoint;
Questions
Is implementing a GenericFileProcessingStrategy the correct thing to do in this situation? If so, is there an example of this somewhere? I have looked through the camel file unit tests and didn't see anything that jumped out at me.
What am I doing wrong with configuring the endpoint? I feel like the answer to cleaning up this mess is tied in with question 3.
Can you configure the file endpoint to roll dated folders when polling and the date changes?
As always thanks for the help.
You can refer to a custom ProcessStrategy from the endpoint uri using the processStrategy option, eg file:xxxx?processStrategy=#myProcess. Notice how we prefix the value with # to indicate it should lookup it from the registry. So in Spring XML you just add a <bean id="myProcess" ...> tag
In Java its probably easier to grab the endpoint from the CamelContext API:
FileEndpoint file = context.getEndpoint("file:xxx?aaa=123&bbb=456", FileEndpoint.class);
This allows you to pre configure the endpoint. And of course afterwards you can use the API on FileEndpoint to set other configurations.
In java, this is how to use GenericFileProcessingStrategy :
@Component
public class CustomGenericFileProcessingStrategy<T> extends GenericFileProcessStrategySupport<T> {
public CustomFileReadyToCopyProcessStrategy() {
}
public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
super.begin(operations, endpoint, exchange, file);
...
}
public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
super.commit(operations, endpoint, exchange, file);
...
}
public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
super.rollback(operations, endpoint, exchange, file);
...
}
}
And then create you route Builer class:
public class myRoutes() extends RouteBuilder {
private final static CustomGenericFileProcessingStrategy customGenericFileProcessingStrategy;
public myRoutes(CustomGenericFileProcessingStrategy
customGenericFileProcessingStrategy) {
this.customGenericFileProcessingStrategy = customGenericFileProcessingStrategy ; }
@Override public void configure() throws Exception {
FileEndpoint fileEndPoint= camelContext.getEndpoint("file://mySourceDirectory");
fileEndPoint.setProcessStrategy(myCustomGenericFileProcessingStrategy );
from(fileEndPoint).setBody(...)process(...).toD(...);
...
}
精彩评论