How to wrap a JMS to WebSphere MQ bridge in a synchronous call using the request-reply pattern?
I am just dealing with a new scenario for me, which I believe might be common to some :)..
As per requirements I need to build a user experience to be like a synchronous on-line transaction for a web service call, which actually delegates the call to a IBM MQ Series using an asynchronous JMS-MQ Bridge.
The client calls the web service and than his message should be published in a JMS queue on the App server which will be delivered to WebSphere MQ and than after processing a response will delivered back to App server in a FIXED JMS queue endpoint.
The requirement deals with this t开发者_运维问答ransaction that will need to time out in case WebSphere MQ does not delivery the response in a defined amount of time, than the web service should send a time-out signal to client and ignore this transaction.
The sketch of the problem follows.
I need to block the request on the web service until the response arrives or time-out.
Than I am looking for some open library to help me on this task. Or the only solution is blocking a thread and keep pooling for the response? Maybe I could implement some block with a listener to be notified when the response arrives?
A bit of discussion would be very helpful for me now to try to clear my ideas on this. Any suggestions?
I have a sketch that I hope will help clearing the picture ;)
Hey, thanks for posting your own solution!
Yep, receive() with timeout is the most elegant way to go in this case.
Beware of what happens with messages that aren't read because of the timeout. If your client acceses the same queue again, he might pick up a stale message.
Make sure the messages that timeout are deleted in a timely manner (if for no other reason, then not to fill up the queue with unprocessed messages).
You can do this easily either through code (setting time-to-live on the message producer) or on the Websphere MQ server (using using queues that expire messages automatically).
The latter is easier if you can't/don't want to modify the MQ side of the code. It's what I would do :)
after a couple of days coding I got to a solution for this. I am using standard EJB3 with JAX-WS annotations and Standard JMS.
The code I have written so far to meet the requirements follows. It is a Stateless Session Bean with bean managed transaction(BMT) as using standart container managed transaction (CMT) was causing some kind of hang on it, I believe because I was trying to put both JMS interactions in the same transaction as they are in the same method so notice I had to start and finish transactions for each interaction with the JMS queues. I am using weblogic for this solution. And I have also coded an MDB which basically consumes the message from queue endpoint jms/Pergunta and places a response message on the jms/Resposta queue I did this to mock the expected behavior on the MQ side of this problem. Actually in a real scenario we would probably have some COBOL application on the mainframe or even other java application dealing with the messages and placing the response on the response queue.
If someone need to try this code basically all you need is to have a container J2EE5 and configure 2 queues with jndi names: jms/Pergunta and jms/Resposta.
The EJB/Webservice code:
@Stateless
@TransactionManagement(TransactionManagementType.BEAN)
@WebService(name="DJOWebService")
public class DJOSessionBeanWS implements DJOSessionBeanWSLocal {
Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName());
@Resource
SessionContext ejbContext;
// Defines the JMS connection factory.
public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
// Defines request queue
public final static String QUEUE_PERG = "jms/Pergunta";
// Defines response queue
public final static String QUEUE_RESP = "jms/Resposta";
Context ctx;
QueueConnectionFactory qconFactory;
/**
* Default constructor.
*/
public DJOSessionBeanWS() {
log.info("Construtor DJOSessionBeanWS");
}
@WebMethod(operationName = "processaMensagem")
public String processaMensagem(String mensagemEntrada, String idUnica)
{
//gets UserTransaction reference as this is a BMT EJB.
UserTransaction ut = ejbContext.getUserTransaction();
try {
ctx = new InitialContext();
//get the factory before any transaction it is a weblogic resource.
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
log.info("Got QueueConnectionFactory");
ut.begin();
QueueConnection qcon = qconFactory.createQueueConnection();
QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta"));
TextMessage message = qsession.createTextMessage("this is a request message");
message.setJMSCorrelationID(idUnica);
qsession.createSender(qs).send(message);
ut.commit();
qcon.close();
//had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required
ut.begin();
QueueConnection queuecon = qconFactory.createQueueConnection();
Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta"));
QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String messageSelector = "JMSCorrelationID = '" + idUnica + "'";
//creates que receiver and sets a message selector to get only related message from the response queue.
QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector);
queuecon.start();
//sets the timeout to keep waiting for the response...
TextMessage tresposta = (TextMessage) qr.receive(10000);
if(tresposta != null)
{
ut.commit();
queuecon.close();
return(tresposta.toString());
}
else{
//commints anyway.. does not have a response though
ut.commit();
queuecon.close();
log.info("null reply, returned by timeout..");
return "Got no reponse message.";
}
} catch (Exception e) {
log.severe("Unexpected error occurred ==>> " + e.getMessage());
e.printStackTrace();
try {
ut.commit();
} catch (Exception ex) {
ex.printStackTrace();
}
return "Error committing transaction after some other error executing ==> " + e.getMessage();
}
}
}
And this is the code for the MDB which mocks the MQ side of this problem. I had a Thread.sleep fragment during my tests to simulate and test the timeout on the client side to validate the solution but it is not present in this version.
/**
* Mock to get message from request queue and publish a new one on the response queue.
*/
@MessageDriven(
activationConfig = { @ActivationConfigProperty(
propertyName = "destinationType", propertyValue = "javax.jms.Queue"
) },
mappedName = "jms/Pergunta")
public class ConsomePerguntaPublicaRespostaMDB implements MessageListener {
Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());
// Defines the JMS connection factory.
public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
// Define Queue de resposta
public final static String QUEUE_RESP = "jms/Resposta";
Context ctx;
QueueConnectionFactory qconFactory;
/**
* Default constructor.
*/
public ConsomePerguntaPublicaRespostaMDB() {
log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB");
try {
ctx = new InitialContext();
} catch (NamingException e) {
e.printStackTrace();
}
}
/**
* @see MessageListener#onMessage(Message)
*/
public void onMessage(Message message) {
log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage");
TextMessage tm = (TextMessage) message;
try {
log.info("Mensagem recebida no onMessage ==>> " + tm.getText());
//pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta.
String idMensagem = tm.getJMSCorrelationID();
log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem);
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem");
QueueConnection qcon = qconFactory.createQueueConnection();
QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) (ctx.lookup("jms/Resposta"));
TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta...");
tmessage.setJMSCorrelationID(idMensagem);
qsession.createSender(queue).send(tmessage);
} catch (JMSException e) {
log.severe("Erro no onMessage ==>> " + e.getMessage());
e.printStackTrace();
} catch (NamingException e) {
log.severe("Erro no lookup ==>> " + e.getMessage());
e.printStackTrace();
}
}
}
[]s
精彩评论