Configure RabbitMQ AMQP Transport With WSO2 Enterprise Integrator
is open-source message-broker software that originally implemented the Advanced Message Queuing Protocol (AMQP). The RabbitMQ AMQP transport is implemented using the RabbitMQ Java Client. It allows you to send or receive AMQP messages by directly calling an AMQP broker (RabbitMQ).
ESB can act as a message provider to the queue and a consumer from the queue. The following diagram shows provider and consumer operations.
As in the image, a proxy service in the ESB Profile of WSO2 EI listens to Q1, and when a message becomes available on the queue, the proxy service consumes it and publishes it to Q2.
In this post, I am going to discuss how to configure RabbitMQ with WSO2 EI 6.6.0 for Message provider and consumer operations.
✔️ Installing RabbitMQ
The easiest way of Downloading and Installing RabbitMQ is using Docker images. You can find the relevant docker images from this link and use the below command to pull the latest image.
docker pull rabbitmq
Run the docker image by using the below command.
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Then you can access the console from this URL and the default username and password is guest.
http://localhost:15672
✔️ Configure RabbitMQ Transport in ESB
First, we need to add the below configurations to the axis2.xml file to enable the RabbitMQ Transport.
Open axis2.xml (located in EI_HOME/conf/axis2 folder) and add the following under the RabbitMQ listener.
<transportReceiver name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQListener">
<parameter name="AMQPConnectionFactory" locked="false">
<parameter name="rabbitmq.server.host.name" locked="false">localhost</parameter>
<parameter name="rabbitmq.server.port" locked="false">5672</parameter>
<parameter name="rabbitmq.queue.name" locked="false">Rabbitqueue</parameter>
<parameter name="rabbitmq.server.user.name" locked="false">guest</parameter>
<parameter name="rabbitmq.server.password" locked="false">guest</parameter>
<parameter name="rabbitmq.connection.retry.interval" locked="false">10000</parameter>
<parameter name="rabbitmq.connection.retry.count" locked="false">5</parameter>
</parameter>
</transportReceiver>
💠queue.name=Rabbitqueue user.name=guest password=guest
In case of a network failure or broker shutdown, connection.retry.interval and connection.retry.count parameters are applied to enable connection recovery in RabbitMQ.
In the Transport Sender section, add the following RabbitMQ sender
<transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"/>
After that restart the server to apply the above changes.
✔️ Create Proxy Service to Publish message to a RabbitMQ message queue
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="RabbitProxy"
transports="http https"
startOnLoad="true">
<description/>
<target>
<inSequence>
<property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
<call>
<endpoint>
<address uri="rabbitmq:/?rabbitmq.server.host.name=localhost&rabbitmq.server.user.name=guest&rabbitmq.server.password=guest&rabbitmq.server.port=5672&rabbitmq.queue.name=Rabbitqueue&rabbitmq.queue.route.key=route&rabbitmq.exchange.name=exchange"/>
</endpoint>
</call>
<respond/>
</inSequence>
</target>
</proxy>
Invoke the above service with an input payload and after that check the rabbitmq management console to view the message.
✔️ Multiple Connection Factories
Further, you can create multiple connection factories if you want this listener to connect to multiple brokers or sender to send messages into different queues.
<transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"><!--First connection factory--><parameter name="AMQPConnectionFactory" locked="false">
<parameter name="rabbitmq.server.host.name" locked="false">localhost</parameter>
<parameter name="rabbitmq.server.port" locked="false">5672</parameter>
<!--parameter name="rabbitmq.queue.name" locked="false">Rabbitqueue</parameter-->
<parameter name="rabbitmq.server.user.name" locked="false">guest</parameter>
<parameter name="rabbitmq.server.password" locked="false">guest</parameter>
<parameter name="rabbitmq.connection.retry.interval" locked="false">10000</parameter>
<parameter name="rabbitmq.connection.retry.count" locked="false">5</parameter>
</parameter><!--Second connection factory--><parameter name="NewFactory" locked="false">
<parameter name="rabbitmq.server.host.name" locked="false">localhost</parameter>
<parameter name="rabbitmq.server.port" locked="false">5672</parameter>
<!--parameter name="rabbitmq.queue.name" locked="false">Rabbitnewqueue</parameter-->
<parameter name="rabbitmq.server.user.name" locked="false">guest2</parameter>
<parameter name="rabbitmq.server.password" locked="false">guest2</parameter>
<parameter name="rabbitmq.connection.retry.interval" locked="false">10000</parameter>
<parameter name="rabbitmq.connection.retry.count" locked="false">5</parameter>
</parameter> </transportSender>
Then you can send the message to a queue as below using one of the connection factories.
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="RabbitProxy2"
transports="http https"
startOnLoad="true">
<description/>
<target>
<inSequence>
<property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
<call>
<endpoint>
<address uri="rabbitmq:/?rabbitmq.connection.factory=NewFactory&rabbitmq.queue.name=Rabbitnewqueue"/>
</endpoint>
</call>
<respond/>
</inSequence>
</target>
</proxy>
✔️ Create Proxy Service to consume messages from the queue
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="AMQPProxy"
transports="rabbitmq"
startOnLoad="true">
<description/>
<target>
<inSequence>
<log level="full"/>
</inSequence>
</target>
<parameter name="rabbitmq.queue.name">Rabbitqueue</parameter>
<parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
💠rabbitmq.queue.name:queue on which the proxy service listens and consumes messages.
💠The rabbitmq.connection.factory parameter specifies the listener that listens on the queue and consumes messages.
That’s All! 😃 Hope this will be helpful to beginners.