开发者

ActiveMQ and JMS : Basic steps for novice

Hi all please give some basic about Act开发者_如何学CiveMQ with JMS for novice. And configuration steps also.


We are going to create a console based application using multithreading. So create an java project for console application.

Now follow these steps..........

  1. Add javax.jms.jar, activemq-all-5.3.0.jar, log4j-1.2.15.jar to your project library. (You can download all of above jar files from http://www.jarfinder.com/ .

  2. create a file naming jndi.properties and paste these following texts .. ( Deatils for jndi.properties just Google it)


# START SNIPPET: jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616

# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = example.MyQueue


# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = example.MyTopic

# END SNIPPET: jndi

Add JMSConsumer.java


import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class JMSConsumer implements Runnable{
    private static final Log LOG = LogFactory.getLog(JMSConsumer.class);

    public void run() {
        Context jndiContext = null;
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;
        Destination destination = null;
        String sourceName = null;
        final int numMsgs; 
        sourceName= "MyQueue";
        numMsgs = 1;
        LOG.info("Source name is " + sourceName);
        /*
         * Create a JNDI API InitialContext object
         */
        try {
            jndiContext = new InitialContext();
        } catch (NamingException e) {
            LOG.info("Could not create JNDI API context: " + e.toString());
            System.exit(1);
        }

        /*
         * Look up connection factory and destination.
         */
        try {
            connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory");
            destination = (Destination)jndiContext.lookup(sourceName);
        } catch (NamingException e) {
            LOG.info("JNDI API lookup failed: " + e);
            System.exit(1);
        }


        try {
            connection = connectionFactory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
            consumer = session.createConsumer(destination);
            connection.start();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            MessageListener listener = new MyQueueMessageListener();
            consumer.setMessageListener(listener ); 
            //Let the thread run for some time so that the Consumer has suffcient time to consume the message
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } catch (JMSException e) {
            LOG.info("Exception occurred: " + e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                }
            }
        }
    }

    }

Add JMSProducer.java


import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class JMSProducer implements Runnable{
private static final Log LOG = LogFactory.getLog(JMSProducer.class);

public JMSProducer() {
}

//Run method implemented to run this as a thread.
public void run(){
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
String destinationName = null;
final int numMsgs; 
destinationName = "MyQueue";
numMsgs = 5;
LOG.info("Destination name is " + destinationName);

/*
* Create a JNDI API InitialContext object
*/
try {
    jndiContext = new InitialContext();
} catch (NamingException e) {
    LOG.info("Could not create JNDI API context: " + e.toString());
    System.exit(1);
}

/*
* Look up connection factory and destination.
*/
try {
    connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory");
    destination = (Destination)jndiContext.lookup(destinationName);
} catch (NamingException e) {
    LOG.info("JNDI API lookup failed: " + e);
    System.exit(1);
}

/*
* Create connection. Create session from connection; false means
* session is not transacted.create producer, set the text message, set the co-relation id and send the message.
*/
try {
    connection = connectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    producer = session.createProducer(destination);
    TextMessage message = session.createTextMessage();
    for (int i = 0; i 

Add MyQueueMessageListener.java


import java.io.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;


public class MyQueueMessageListener implements MessageListener {
    private static final Log LOG = LogFactory.getLog(MyQueueMessageListener.class);
    /**
    *
    */
    public MyQueueMessageListener() {
    // TODO Auto-generated constructor stub
    }

    /** (non-Javadoc)
    * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
    * This is called on receving of a text message.
    */
    public void onMessage(Message arg0) {
        LOG.info("onMessage() called!");
        if(arg0 instanceof TextMessage){
            try {
                //Print it out
                System.out.println("Recieved message in listener: " + ((TextMessage)arg0).getText());

                System.out.println("Co-Rel Id: " + ((TextMessage)arg0).getJMSCorrelationID());
                try {
                    //Log it to a file
                    BufferedWriter outFile = new BufferedWriter(new FileWriter("MyQueueConsumer.txt"));
                    outFile.write("Recieved message in listener: " + ((TextMessage)arg0).getText());
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else{
            System.out.println("~~~~Listener : Error in message format~~~~");
        }

    }

    }

Add SimpleApp.java


public class SimpleApp {

    //Run the producer first, then the consumer
    public static void main(String[] args) throws Exception {
        runInNewthread(new JMSProducer());
        runInNewthread(new JMSConsumer()); 
    }

    public static void runInNewthread(Runnable runnable) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(false);
        brokerThread.start();
    }

}

Now run SimpleApp.java class.

All da best. Happy coding.


Here it is a simple junit test for ActiveMQ and Apache Camel. This two technologies works very good together.

If you want more details about the code, you can find a post in my blog:

http://ignaciosuay.com/unit-testing-active-mq/

public class ActiveMQTest extends CamelTestSupport {

    @Override
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));

        return camelContext;
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {

            from("mina:tcp://localhost:6666?textline=true&sync=false")
             .to("activemq:processHL7");

            from("activemq:processHL7")
              .to("mock:end");
            }
        };
    }

    @Test
    public void testSendHL7Message() throws Exception {
        MockEndpoint mock = getMockEndpoint("mock:end");

        String m = "MSH|^~\\&|hl7Integration|hl7Integration|||||ADT^A01|||2.5|\r" +
                "EVN|A01|20130617154644\r" +
                "PID|1|465 306 5961||407623|Wood^Patrick^^^MR||19700101|1|\r" +
                "PV1|1||Location||||||||||||||||261938_6_201306171546|||||||||||||||||||||||||20130617134644|";

        mock.expectedBodiesReceived(m);

        template.sendBody("mina:tcp://localhost:6666?textline=true&sync=false", m);

        mock.assertIsSatisfied();
    }
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜