All IT Courses 50% Off
JAVA Tutorials

How to Configure Topic/Queue on Active MQ?

Configure Topic

Configure Topic is when a publisher sends a message, there may be more than one customer interested in such messages. Publisher broadcasts the message to JMS destination called a topic.

Step 1: Create dependencies in pom.xml

<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example.jms</groupId>
    <artifactId>springJmsQueue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.12.0</version>
        </dependency>
    </dependencies>     
</project>

Step 2: Create a Topic named as JmsTopicExample

import java.net.URI;
import java.net.URISyntaxException; 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic; 
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; 
public class JmsTopicExample {
    public static void main(String arg[]) throws Exception {
        BrokerService broker = BrokerFactory.createBroker(new URI(
                "broker:(tcp://localhost:61616)"));
        broker.start();
        Connection connection = null;
        try {
            // Producer
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://localhost:61616");
            connection = connectionFactory.createConnection();
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("customerTopic");                  
            // Consumer1 subscribes to customerTopic
            MessageConsumer consumer1 = session.createConsumer(topic);
            consumer1.setMessageListener(new ConsumerMessageListener("Consumer1"));             
            // Consumer2 subscribes to customerTopic
            MessageConsumer consumer2 = session.createConsumer(topic);
            consumer2.setMessageListener(new ConsumerMessageListener("Consumer2"));             
            connection.start();                  
            // Publish
            String payload = "Important Task";
            Message msg = session.createTextMessage(payload);
            MessageProducer producer = session.createProducer(topic);
            System.out.println("Sending text '" + payload + "'");
            producer.send(msg);             
            Thread.sleep(3000);
            session.close();
        } finally {
            if (connection != null) {
                connection.close();
            }
            broker.stop();
        }
    }
}

Step 3: Consume messages from Topic using MessageListener named as ConsumerMessageListener

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; 
public class ConsumerMessageListener implements MessageListener {
    private String consumerName;
    public ConsumerMessageListener(String consumerName) {
        this.consumerName = consumerName;
    } 
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println(consumerName + " received " + textMessage.getText());
        } catch (JMSException e) {          
            e.printStackTrace();
        }
    } 
}

Configure Queue

JMS Message queue is a destination to which producers send messages. The consumer connects to the broker to receive the message sitting in the queue. The Queue is used in point-to-point messaging.

Step 1: Create dependencies in pom.xml

<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example.jms</groupId>
    <artifactId>springJmsQueue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.12.0</version>
        </dependency>
    </dependencies>
     
</project>

Step 2: Create a Queue

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(“customerQueue”);

Step 3: Send message to a queue

MessageProducer producer = session.createProducer(queue);
producer.send(msg);

Step 4: Receive message from Queue named as JmsMessageQueueExample

import java.net.URI;
import java.net.URISyntaxException; 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService; 
public class JmsMessageQueueExample {
public static void main(String arg[]) throws Exception {
        BrokerService broker = BrokerFactory.createBroker(new URI(
                "broker:(tcp://localhost:61616)"));
        broker.start();
        Connection connection = null;
        try {
            // Producer
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://localhost:61616");
            connection = connectionFactory.createConnection();
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("customerQueue");
            String payload = "Important Task";
            Message msg = session.createTextMessage(payload);
            MessageProducer producer = session.createProducer(queue);
            System.out.println("Sending text '" + payload + "'");
            producer.send(msg); 
            // Consumer
            MessageConsumer consumer = session.createConsumer(queue);
            connection.start();
            TextMessage textMsg = (TextMessage) consumer.receive();
            System.out.println(textMsg);
            System.out.println("Received: " + textMsg.getText());
            session.close();
        } finally {
            if (connection != null) {
                connection.close();
            }
                        broker.stop();
        }
    }

Facebook Comments
Tags

Related Articles

Back to top button
Close
Close

Get Java Training Online
Course Now at 50%OFF!

Offer valid for 1st 20 seats only, Hurry up!!

You have successfully subscribed to the newsletter

There was an error while trying to send your request. Please try again.

H2kinfosys will use this information to be in touch with you for providing updates.

Get Python Course
worth 499$ for FREE!

Offer valid for 1st 20 seats only, Hurry up!!

You have successfully subscribed to the newsletter

There was an error while trying to send your request. Please try again.

H2kinfosys Blog will use the information you provide on this form to be in touch with you and to provide updates and marketing.