How to handle order of messages in JMS?
I am reviewing a client-server application written in Java. The server receives JMS m开发者_开发知识库essages and processes them but the messages can come in an unexpected order, and a cancel can arrive before an order message. How do you handle such a case? Do you do it in the mdb?
What are some strategies or patterns for this kind of scenario?
So far I know, this is refereed to as "out-of-order" delivery and is part of the quality of service (QoS) attributes of the JMS system. I don't think it's part of the JMS specification, but some provider support it maybe. That will depend on the particular JMS implementation you use.
Note however that JMS is meant to distribute messages to several consumers in a way to distribute the load. If message have to be delivered in an ordered fashion this is not possible -- it basically lead to serialization of the message delivery and message could not be processed concurrently.
The wikipedia says it better than me:
JMS queue A staging area that contains messages that have been sent and are waiting to be read. Note that, contrary to what the name queue suggests, messages don't have to be delivered in the order sent. If the message driven bean pool contains more than one instance then messages can be processed concurrently and thus it is possible that a later message is processed sooner than an earlier one. A JMS queue guarantees only that each message is processed only once.
Out-of-band cancel request is not easy to achieve with JMS then. Two ideas:
- Store a ticket which corresponds to each message in a database could be used to cancel message easily. When the message is delivered, the MDB check if the corresponding ticket is still valid. If yes, proceeds further, if not, drop the message.
- Try to set the MDB pool size to one. Maybe in this case, the delivery will be ordered. Changing the pool size is app. server specific, but most of them support per-bean pool size.
Otherwise, have maybe a look at the message store pattern. It's anyway worth checking the EAI website.
Your system will be a lot more flexible if it can cope with the out-of-order messages. The pattern I've used to solve this in the past is to use a delay queue (on a system that processed 8 million messages-per-day in the financial world).
In your example, if I received a delete for an order I hadn't received yet, I'd delay it for a period of time and retry. If I still knew nothing about the order I'm being asked to delete, I'd raise some sort of error (reply to the original sender, send a message to a special error queue, ...).
As to the implementation of the delay queue, this can be another JMS queue with a service that can accept messages to be delayed. It then periodically reads the delayed messages and checks if the delayed time period has expired and re-submit the message to the original destination queue.
I second the advice about checking the EAI site, and the book it is based on (a fantastic text on MOMs and MOM patterns).
Personally I'd investigate the Resequencer, though.
How to assure the sequence of message received by mdb? is about a similar topic on the server side where someone indicates that ActiveMQ might have a solution that preserves order. I guess this makes it verdor specific though.
JMS queue should be treated as FIFO queue in general.
The reasons for ordering being spoiled, according to IBM MQ documentation are:
- Multiple destinations
- Multiple producers
- Multiple consumers
- Publish and subscribe (imply multiple instances of a subscription)
Similar statements for ActiveMQ
ActiveMQ will preserve the order of messages sent by a single producer to all consumers on a topic. If there is a single consumer on a queue then the order of messages sent by a single producer will be preserved as well. If you have multiple consumers on a single queue the consumers will compete for messages and ActiveMQ will load balance across them, so order will be lost.
You need to process messages of the same group by the same thread (sequentially) not to reorder them. Kafka provide you smart partitioning of the message based on message key. ActiveMQ has notion of message groups which utilizes message header.
Consider partitioning example in consumer application using java fair locks if you can't use the above. Message reading from the queue and partition derivation should be synchronous, actual processing could be parallelized.
String message;
String messageKey;
ReentrantLock messageKeyLock;
partitioningSupport.getFairLock().lock();
try {
// use DUPS_OK_ACKNOWLEDGE with deduplication service which improve performance of sequential read
message = (String) jmsTemplate.receiveAndConvert(QUEUE);
if (message == null || deduplicationService.deduplicate(md5(message)))
continue;
messageKey = findByXPath(path, message)
messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
} finally {
partitioningSupport.getFairLock().unlock();
}
messageKeyLock.lock();
try {
// parallel message processing
} finally {
messageKeyLock.unlock();
}
With 10 key diversity (number of unique keys), 10 consumer threads and 255 partitions, locking is notable
With 1000 key diversity and others being the same, locking is rather casual and not notable (probability of waiting is relatively small).
Implementation
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.StringUtils.isBlank;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class PartitioningSupport {
private final ConcurrentHashMap<Integer, ReentrantLock> locks = new ConcurrentHashMap<>();
private final ReentrantLock fairLock = new ReentrantLock(true);
private final int diversity;
public PartitioningSupport() {
this(0xff);
}
public PartitioningSupport(int diversity) {
this.diversity = diversity;
}
public ReentrantLock getPartitionLock(String messageKey) {
fairLock.lock();
try {
int partition = partition(messageKey);
ReentrantLock lock = locks.get(partition);
if (lock == null) {
lock = new ReentrantLock(true);
locks.put(partition, lock);
}
return lock;
} finally {
fairLock.unlock();
}
}
private int partition(String key) {
return (isBlank(key) ? nextInt() : key.hashCode()) & diversity;
}
public ReentrantLock getFairLock() {
return fairLock;
}
}
Test
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.lang.System.out;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.RandomUtils.nextLong;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.jupiter.api.Test;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class PartitioningSupportTest {
private BlockingQueue<String> queue = new LinkedBlockingDeque<>();
private List<Future<?>> results = new ArrayList<>();
private ExecutorService consumers = newFixedThreadPool(10, new ThreadFactoryBuilder().setNameFormat("consumer-%s").build());
private PartitioningSupport partitioningSupport = new PartitioningSupport();
private volatile ConcurrentHashMap<String, AtomicInteger> ids;
private int repeatTest = 10;
private int uniqueKeysCount = 1; // 100
private int totalMessagesCount = 1000;
@Test
public void testProcessingOrder() throws InterruptedException, ExecutionException {
for (int testIter = 0; testIter < repeatTest; testIter++) {
ids = new ConcurrentHashMap<>();
results = new ArrayList<>();
for (int messageIter = 1; messageIter <= totalMessagesCount; messageIter++) {
String messageKey = "message-" + nextInt(0, uniqueKeysCount);
ids.putIfAbsent(messageKey, new AtomicInteger());
queue.put(format("%s.%s", messageKey, messageIter));
}
for (int i = 0; i < totalMessagesCount; i++)
results.add(consumers.submit(this::consume));
for (Future<?> result : results)
result.get();
}
consumers.shutdown();
}
private void consume() {
try {
String message;
String messageKey;
ReentrantLock messageKeyLock;
partitioningSupport.getFairLock().lock();
try {
message = queue.take();
messageKey = message.substring(0, message.indexOf('.'));
messageKeyLock = partitioningSupport.getPartitionLock(messageKey);
} finally {
partitioningSupport.getFairLock().unlock();
}
messageKeyLock.lock();
try {
sleep(nextLong(1, 10));
int ordinal = parseInt(message.substring(message.indexOf('.') + 1));
int previous = ids.get(messageKey).getAndSet(ordinal);
out.printf("processed: %s - %s%n", messageKey, ordinal);
assertTrue(ordinal > previous, format("broken order %s [%s -> %s]", messageKey, previous, ordinal));
} finally {
messageKeyLock.unlock();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
精彩评论