Effective JMS processing
We have a JMS queue which receives a very large num开发者_JAVA百科ber of messages.
Listener has to save message in database using a database transaction and then commit JMS transaction.
So how can i do it more effectively where i don't have to do database & JMS commit on each message.
Don't do it on each message, do it in batches. JMS supports transactions just like your DB does; start a JMS transaction, read N messages. Start DB transaction, insert N messages. Commit to JMS, commit to DB.
This obviously introduces a window for a race to occur (crash happens between the two commits). You have that now, but only for a single message. If you want to solve that problem you're faced with either looking at XA transactions (two phased commit) or at the very least some sort of duplicate detection scheme. For some intro to that, take a look at: http://activemq.apache.org/should-i-use-xa.html
The premise behind async messaging, especially when using an MDB, is that each message is atomic. That is to say, the outcome of processing any one message is supposed to be independent of the outcome of processing any other message. The ideal solution to your problem will preserve this atomicity of messages.
If you were to process multiple messages in the same unit of work, then you would lose this atomicity. For example, suppose you decided to syncpoint at every 25 messages. If the 25th message had an error, such as a code page conversion problem that prevented it from being retrieved from the queue, the entire batch of messages would be backed out. They would then all be redelivered. The redelivery count for the messages would increment with each read/backout cycle. Once the redelivery count had exceeded the threshold set in your app server, all 25 of the messages would be discarded or requeued, depending on your configuration. The larger the batch, the more messages are potentially affected in an error situation because the entire batch lives or dies together. Set your batch size to 100 and 100 messages will be at risk in the event of a single poison message.
An alternative solution is to allow for many processing threads in your MDB. With JMS you can spawn many sessions under the same connection. Each session can manage it's own unit of work, therefore each session can independently start an XA transaction, get a message, update the database and then commit the transaction. If one message is bad, only that message and database update are affected.
There are exceptions to this. For example, if processing a large batch and the messages all originate from the same producer, it is common to use something other than an MDB to fetch many messages and update many rows under the same unit of work. Similarly, if the messages are sequence-dependent then parallel processing is not possible because it would not preserve sequence. But then again, sequence dependent messages are not atomic. Again, in this case an MDB is not the ideal solution.
Depending on your transport provider, the number of threads supported may be limited only by memory storage. WebSphere MQ for example can easily handle hundreds of simultaneous getter threads on a queue. Check the tuning for your app server's MDB configuration to see how many threads you can spin up and then verify that your transport can handle the load. Then play around a bit to find the optimum number of threads. Performance will increase dramatically as threads increase from one, but only up to a point. Past that point you generally see a plateau, then a decline as thread management overhead offsets the performance gains. Where the swe3et spot lies depends on how heavily the messaging broker is loaded and whether it is most constrained by CPU, memory, disk or network.
Here is a jms processor that will take messages from one queue, add them to a list, and push back to another queue. You can adjust how the values are read and aggregated in the respective methods:
public class JmsBatcher<T> {
final Session session;
private final MessageConsumer consumer;
private final MessageProducer producer;
private final int batchSize;
public JmsBatcher(final Connection connection,
final String sourceQueue,
final String destQueue,
final int batchSize) throws JMSException {
this.batchSize = batchSize;
session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
final Queue source = session.createQueue(sourceQueue);
final Queue dest = session.createQueue(destQueue);
consumer = session.createConsumer(source);
producer = session.createProducer(dest);
}
public void processBatch() {
final List<T> values = new ArrayList<>();
try {
while (values.size() < batchSize) {
final Message message = consumer.receive();
values.add(readObject(message));
message.acknowledge();
}
producer.send(createAggregate(values));
session.commit();
} catch (Exception e) {
// Log the exception
try {
session.rollback();
} catch (JMSException re) {
// Rollback failed, so something fataly wrong.
throw new IllegalStateException(re);
}
}
}
private Message createAggregate(final List<T> values) throws JMSException {
return session.createObjectMessage((Serializable) values);
}
private T readObject(final Message message) throws JMSException {
return (T) ((ObjectMessage) message).getObject();
}
}
This can be started in a separate thread, and just run forever:
final JmsBatcher jmsBatcher =
new JmsBatcher(connection, "single", "batch", 25);
new Thread(() -> {
while (true) {
jmsBatcher.processBatch();
}
}).start();
You can then commit to the database in batches from the batched up results. If there are any failures, the transaction will be retried.
精彩评论