Is there an enterprise integration pattern for combining items from 2 queues?
I have two systems that produce different types of messages that I need to consume and then generate new messages based on them.
- The first produces NEWFILE messages containing a Filename that arrive on a queue queue:file.new.
The second produces TASK messages containing a Filename and a TaskType and go to queue:task.
There will only be one NEWFILE message for a given Filename.
- There will be multiple TASK messages for a given Filename, each with a different T开发者_高级运维askType.
- The messages can arrive in any order.
I need to forward each TASK message that turns up to queue:perform but only when a NEWFILE message for the matching Filename has arrived.
At the moment we do this matching in another (non queue based) system by that uses polling to find the FILEs and TASKs and maintains two tables, one for FILES and one for TASKS and when either turn up, we check what matches there are and fire of the relevant task.
We are looking to replace this with a messaging solution where we maintain as little state as possible and make as much use of enterprise patterns (Apache Camel).
It's not clear to me which pattern / components would satisfy my requirements.
Thanks, Tom
It seems that you should use the Aggregator pattern. You may use the various options of the component to indicate the completion criteria and the correlation among the messages
For instance, the following requirement indicates that you probably should consider the Filename on your correlationExpression
:
There will only be one NEWFILE message for a given Filename There will be multiple TASK messages for a given Filename
In order to process the tasks, you may temporarily store the task messages onto the
newExchange
object, and then when appropriate you can flush the stored messages to their target queues. I believe you have two main options for the appropriate moment:
Wait for all the messages of a given context to be aggregated (NEWFILE and TASK messages) using an completion criteria, like size, timeout, etc. for that you may use for instance the
completionPredicate
option in order to evaluate whether the context has completed or not. And after the completion, send the tasks to the following queue.Wait until the NEWFILE message arrives, flush the temporarily stored TASK messages to the target queue, and then wait for remaining TASK messages (again, using the appropriate criteria to indicate the message context has ended).
I haven't tried that in code, but I believe that while the first might be easier, the second option may give you better performance, because since the TASK messages are being forwarded to the their target queues as soon as possible, the first tasks are likely be completed earlier than the latter ones, while in the first case you have to wait for all the tasks to be produced and only after that you start the process. And besides that, as you also flush the temporarily stored messages as soon as possible you will also get lower storage requirements in comparison to the first option. Which means that if you are dealing with large messages and high volumes you will need less memory, if that it is not the case... then both solutions might be similar.
精彩评论