开发者

How to create a custom aggregator in Mule?

What is the recommended way to create a completely custom aggregator in mule 3.x? By completely custom, I mean according to my own logic, not using correlation IDs, message counts, etc.

The documentation on the mulesoft site is outdated, saying to use AbstractEventAggregator which does not exist in 3.x:

http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio

Digging deeper, it looks like this class has been renamed to AbstractAggregator in 3.x:

http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/rout开发者_运维百科ing/AbstractAggregator.html

However, there are no examples that show how to use this. The LoanBroker example described in the first link above actually uses a correlation aggregator (in the 2.x examples, which I assume is what the document is referring to).

At one point, there was an abstract class that had abstract methods shouldAggregate and doAggregate. This is the kind of class I would like to extend.


Look at TestAggregator below for an example of subclassing AbstractAggregator.

import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.transformer.TransformerException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.AggregationException;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.CollectionCorrelatorCallback;
import org.mule.routing.correlation.EventCorrelatorCallback;
import org.mule.util.concurrent.ThreadNameHelper;

import java.util.Iterator;

public class TestAggregator extends AbstractAggregator
{
    @Override
    protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
    {
        return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
        {
            @Override
            public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
            {
                StringBuffer buffer = new StringBuffer(128);

                try
                {
                    for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
                    {
                        MuleEvent event = iterator.next();
                        try
                        {
                            buffer.append(event.transformMessageToString());
                        }
                        catch (TransformerException e)
                        {
                            throw new AggregationException(events, null, e);
                        }
                    }
                }
                catch (ObjectStoreException e)
                {
                    throw new AggregationException(events,null,e);
                }

                logger.debug("event payload is: " + buffer.toString());
                return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
            }
        };
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜