/java/

ActiveMQ - Java - Point to Point (p2p)

2016-07-14 20:06:54

Description of Installation and configuration of ActiveMQ you can find in my previews post. Here you have simple application Shop and Storehouse applications that use Queue and p2p to exchange data about products.

pom.xml

<?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?>

<project http:="" maven-4.0.0.xsd&quot;="" maven.apache.org="" xmlns='"http://maven.apache.org/POM/4.0.0"' xmlns:xsi='"http://www.w3.org/2001/XMLSchema-instance"' xsd="" xsi:schemalocation='"http://maven.apache.org/POM/4.0.0'>
<modelversion>4.0.0</modelversion>
<groupid>pl.btbw</groupid>
<artifactid>JMS_example_3</artifactid>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<configuration>
<source/>1.7
                    <target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupid>org.apache.activemq</groupid>
<artifactid>activemq-all</artifactid>
<version>5.13.3</version>
</dependency>
</dependencies>
</project>

PropertiesUtil.java

package pl.btbw;

import javax.naming.Context;
import java.util.Properties;

public class PropertiesUtil {

    public static Properties getNoFileProperties() {

        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        properties.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        properties.put(Context.SECURITY_PRINCIPAL, "system");
        properties.put(Context.SECURITY_CREDENTIALS, "manager");

        properties.put("connectionFactoryNames", "QueueCF");
        properties.put("queue.StorehouseRequestQ", "jms.StorehouseRequestQ");
        properties.put("queue.StorehouseResponseQ", "jms.StorehouseResponseQ");

        return properties;
    }

}

There is no rocket science in the Shop class. But one thing is interesting. setJMSReplyTo and createReceiver, by this we can receive callback form Storehouse by Queue

Shop.java

package pl.btbw;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;

public class Shop {

    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private Queue requestQueue;
    private Queue responseQueue;

    public Shop(String queueCF, String requestQueue, String responseQueue) {
        try {

            Context ctx = new InitialContext(
                    PropertiesUtil.getNoFileProperties()
            );

            QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx.lookup(queueCF);

            queueConnection = qFactory.createQueueConnection();
            queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            this.requestQueue = (Queue) ctx.lookup(requestQueue);
            this.responseQueue = (Queue) ctx.lookup(responseQueue);

            queueConnection.start();

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    private void askAboutProduct(String productName) {
        try {

            TextMessage question = queueSession.createTextMessage();
            question.setText(productName);
            question.setJMSReplyTo(responseQueue);

            QueueSender queueSender = queueSession.createSender(requestQueue);
            queueSender.send(question);

            String filter = "JMSCorrelationID = '" + question.getJMSMessageID() + "'";

            QueueReceiver queueReceiver = queueSession.createReceiver(responseQueue, filter);
            TextMessage answer = (TextMessage) queueReceiver.receive(30000);
            if (answer == null) {
                System.out.println("Storehouse - problem with connection");
            } else {
                System.out.println("Storehouse answer: " + answer.getText());
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    private void exit() {
        try {
            queueConnection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

    public static void main(String argv[]) {

        Shop shop = new Shop("QueueCF", "StorehouseRequestQ", "StorehouseResponseQ");

        try {

            BufferedReader bufferedReader = new BufferedReader(
                    new InputStreamReader(System.in)
            );

            System.out.println("Shop Application Started");
            System.out.println("Press enter product name");

            while (true) {
                System.out.print("&gt; ");

                String productName = bufferedReader.readLine();

                if (productName == null || productName.trim().length() &lt;= 0) {
                    shop.exit();
                }

                shop.askAboutProduct(productName);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

Here (in Storehouse) note how incomingMessage and outgoingMessage messages are process Storehouse.java

package pl.btbw;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;

public class Storehouse implements MessageListener {

    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private Queue requestQueue;

    private Map

<string, integer=""> storage = new HashMap<string, integer="">() {{
        put("book", 10);
        put("PC", 5);
        put("apple", 50);
    }};

    public Storehouse(String queueCF, String requestQueue) {

        try {

            Context ctx = new InitialContext(
                    PropertiesUtil.getNoFileProperties()
            );

            QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx.lookup(queueCF);

            queueConnection = qFactory.createQueueConnection();

            queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

            this.requestQueue = (Queue) ctx.lookup(requestQueue);

            queueConnection.start();

            QueueReceiver qReceiver = queueSession.createReceiver(this.requestQueue);
            qReceiver.setMessageListener(this);

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void onMessage(Message incomingMessage) {
        try {

            TextMessage incomingTextMessage = (TextMessage) incomingMessage;

            String productName = incomingTextMessage.getText();

            TextMessage outgoingMessage = queueSession.createTextMessage();
            outgoingMessage.setText(
                    storage.containsKey(productName)
                            ? "in stock: " + productName + ": " + storage.get(productName)
                            : "lack"
            );
            outgoingMessage.setJMSCorrelationID(incomingMessage.getJMSMessageID());

            QueueSender queueSender = queueSession.createSender((Queue) incomingMessage.getJMSReplyTo());
            queueSender.send(outgoingMessage);

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    private void exit() {
        try {
            queueConnection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(1);
    }

    public static void main(String argv[]) {

        System.out.println("Storehouse application started");
        System.out.println("Press enter to quit application");

        Storehouse storehouse = new Storehouse("QueueCF", "StorehouseRequestQ");

        try {
            BufferedReader bufferedReader = new BufferedReader(
                    new InputStreamReader(System.in)
            );
            bufferedReader.readLine();
            storehouse.exit();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

GIT