Tag: RabbitMQ

Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) – using @StreamListener for header based routing – Part 3

In this article, i am not going to explain the basics of Spring Cloud Stream OR the process of creating publishers and subscribers.  Those have been clearly described in the Part 1 and Part 2 of this article series.

It is possible to send messages with headers. In the receiving end (consumer application), there can be multiple message handlers (@StreamListener annotated methods) those accepts messages based on the headers of the message.

A copy of the message will be sent to every handler method and they will accept the message if it matches the given condition . The condition is a SpEL expression (Spring Expression Language) that performs checks on header values.  The sample condition is given as follows.

e.g:-

@StreamListener(target = OrderSink.INPUT,condition = "headers['payment_mode']=='credit'")

(Please refer the source code the complete code)

In that way, you can use the headers to route messages (message routing) among multiple message handlers.  Here we will look at, how to deliver the messages to the correct recipient based on the header.

Continue reading “Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) – using @StreamListener for header based routing – Part 3”

Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) with custom bindings – Part 2

In the previous part, we have tried Spring Cloud Stream pre-built component such as Sink, Source and Processor for building message driven microservices.

In this part, we will look at how to create custom binding classes with custom channels for publishing and retrieving messages with RabbitMQ.

 

Setting up the publisher application

The publisher application is almost similar as the previous article except the bindings and related configurations.

The previous article uses Source class (Spring Cloud Stream built-in component) for configuring the output message channel (@Output) for publishing messages.  Here we are not going to use the built-in component and we will be developing a custom output binding class to build and configure the output message channel.


import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface OrderSource
{
String OUTPUT = "orderPublishChannel";
@Output(OUTPUT)
MessageChannel create();
}

 

we have declared a custom Source class with “orderPublishChannel” as the output message channel.

Now we need to bind this OrderSource class in the OrderController.


@Slf4j
@EnableBinding(OrderSource.class)
@RestController
public class OrderController
{
@Autowired
private OrderSource source;
@PostMapping("/orders/publish")
public String publishOrder(@RequestBody Order order)
{
source.create().send(MessageBuilder.withPayload(order).build());
log.info(order.toString());
return "order_published";
}
}

 

source.create() will configure the output message channel whose name is “orderPublishChannel“.  The published messages will be delegated to the RabbitMQ exchange through the “orderPublishChannel“. 

We need to change the application.properties based on the channel name as follows.

spring.cloud.stream.bindings.orderPublishChannel.destination=orders-exchange

 

Now we have completed the development of the publisher application with custom source bindings for publishing messages.  Lets move forward with developing the consumer application.

 

Setting up the consumer application.

The consumer application is almost similar as the previous article except the bindings and related configurations.

The previous article uses Sink class (Spring Cloud Stream built-in component) for configuring the input message channel (@Input) for retrieving messages.  Here we are not going to use the built-in component and we will be developing a custom input binding class to build and configure the input message channel.

Continue reading “Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) with custom bindings – Part 2”

Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) with Sink, Source and Processor bindings – Part 1

 

What is Spring Cloud Stream?

Spring Cloud Stream is a framework that helps in developing message driven or event driven microservices. Spring Cloud Stream uses an underlying message broker (such as RabbitMQ or Kafka) that is used to send and receive messages between services.

When i am writing this article, there are two implementations of the Spring Cloud Stream.

  1. Spring Cloud Stream implementation that uses RabbitMQ as the underlying message broker.
  2. Spring Cloud Stream implementation that uses Apache Kafka as the underlying message broker.

 

High Level Overview of Spring Cloud Stream

 

application-core.png

source:- https://ordina-jworks.github.io/img/spring-cloud-stream/application-core.png

An application defines Input and Output channels which are injected by Spring Cloud Stream at runtime. Through the use of so-called Binder implementations, the system connects these channels to external brokers.

The difficult parts are abstracted away by Spring, leaving it up to the developer to simply define the inputs and outputs of the application. How messages are being transformed, directed, transported, received and ingested are all up to the binder implementations. (e.g:- RabbitMQ or Kafka)

Continue reading “Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) with Sink, Source and Processor bindings – Part 1”

Spring Cloud Config : Using Git Webhook to Auto Refresh the config changes with Spring Cloud Stream, Spring Cloud Bus and RabbitMQ (Part 3)

 

You can refer the previous parts of this article as follows.

Click here for Part 1 

Click here for Part 2

 

The Problem

In the previous article (Part 2 of this series),  we have discussed how to use Spring Cloud Bus to broadcast the refresh event ( /actuator/bus-refresh) across all the connected services. In here the refresh event should be manually triggered on any service that is connected to the Spring Cloud Bus. (You can select any service as you wish. The only requirement is that it should connect to the Spring Cloud Bus).

The main problem here is that whenever the properties are changed, the refresh event should be manually triggered. Even if it is for just one service, it is still a manual process. What will happen if the developer forgets to manually trigger the refresh event after updating the properties in the remote repository? 

Wouldn’t be nicer if there is any way to automate this refresh event triggering  whenever the remote repository is changed. In order to achieve this, the config server may need to listen for the events of the remote repository.  This can be done with webhook event feature provided by the remote repository providers.

 

 

The Solution

Here is the architecture of the proposed solution.

 

Untitled Diagram (10).png

Continue reading “Spring Cloud Config : Using Git Webhook to Auto Refresh the config changes with Spring Cloud Stream, Spring Cloud Bus and RabbitMQ (Part 3)”

Spring Cloud Config : Refreshing the config changes with Spring Cloud Bus (Part 2)

You can refer the part 1 of this article as follows.

Click here for Part 1 

 

The Problem

The previous article (click here to visit it) has described how to use Spring Cloud Config Server as a centralized location for keeping the configuration properties related to the application services (microservices).  The application services will act as Config Clients who will communicate with Config Server to retrieve the properties related to them.

If any property is changed, the related service need to be notified by triggering a refresh event with Spring Boot Actuator (/actuator/refresh). The user will have to manually trigger this refresh event. Once the event is triggered, all the beans annotated with @RefreshScope will be reloaded (the configurations will be re-fetched) from the Config Server.

In a real microservice environment, there will be a large number of independent application services. Therefore is it not practical for the user to manually trigger the refresh event for all the related services whenever a property is changed.

Continue reading “Spring Cloud Config : Refreshing the config changes with Spring Cloud Bus (Part 2)”

Spring Cloud Bus: Centralizing Message Broker (RabbitMQ or Kafka) connection properties with Spring Cloud Config Server

 

The Problem

In previous article, we have discussed how to use Spring Cloud Bus to broadcast the  configuration property changes (occurred in the Spring Cloud Config Server) across distributed services.

Spring Cloud Bus links or connects the distributed services through a lightweight message broker such as Kafka or RabbitMQ.  whenever the refresh event is triggered in one service, Spring Cloud Bus will broadcast the refresh event across multiple services (known as Config Clients).

Therefore every Config Client should connect to the underlying message broker (that can be either RabbitMQ or Kafka) of the Spring Cloud Bus to listen for the refresh events published/broadcasted. This will lead every Config Client to keep a connection with message broker implemented in the Spring Cloud Bus.

Continue reading “Spring Cloud Bus: Centralizing Message Broker (RabbitMQ or Kafka) connection properties with Spring Cloud Config Server”

Spring AMQP / RabbitMQ Topic Exchange Example Part 3 – Consumer Application (Automatically receiving messages with Listener)

In this article, we will be developing a RabbitMQ Consumer application for the Publisher application developed at previous article (named as part 1).

Click here to view the Publisher Application related to this article.

We have already developed a consumer application to consume the messages published by the above consumer application. In that consumer application we have focused on retrieving the publish messages manually. In other words, pulling the messages from the RabbitMQ queue whenever the client application needs them.  If you need to look at that article, please click here. 

In this article, we will be focusing on receiving the messages from queues with a listener.  The consumer application will be listening for one or more queues for the incoming messages. whenever an incoming message is delivered to one of those queues, that message will be delivered to the consumer application through the listener (available in the consumer application).

This application is very much similar to the Consumer application we have developed or part 2 of this article series. Therefore all the configuration related classes will be same and i am not going to repeatedly explain them here. If you want know the further details, please refer the explanations in those articles.

The full source code of this application can be found at GitHub.

Click here to download the source code. 

Open the source code directory called consumer-listener. In there you can see the source codes related to this article.

I will explain the most critical component here. That is the ConsumerService.


package com.springbootdev.examples.consumer.service;
import com.springbootdev.examples.consumer.model.Car;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = {"all_cars_queue", "nissan_cars_queue"})
public class ConsumerService
{
private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
@RabbitHandler
public void receiveMessage(Car car)
{
logger.info(" receive message [" + car.toString() + "] ");
}
}

ConsumerService will be listening to two queues known as all_cars_queue” and nissan_cars_queue“. whenever a message is delivered to any of those queues, that message will be delivered to this consumer service class. 

 

 

 

Spring AMQP / RabbitMQ Topic Exchange Example Part 2 – Consumer Application (Manually retrieving/pulling messages from queues)

This is the second part of the article that is focused on retrieving the RabbitMQ messages  manually (Simply pulling the messages from queues whenever the client application needs them).

Click here to view the first part of this article. 

As you can see, in the first part, we have developed a Producer application. In this part, we will be developing a Consumer application to retrieve messages from the queue manually (whenever we need them)

The full source code of this article series can be found at GitHub.

Click here to Download the Source Code. 

Open the source code directory called consumer-manual-pull. In there you can see the source codes related to this article.

 

Consumer Application

Here is the project structure.

Screen Shot 2017-11-12 at 8.13.40 PM.png

 

first make sure that you have defined the valid RabbitMQ server details in the application.properties file. These information will be used by the ConnectionFactory class to make a connection with RabbitMQ server when the RabbitTemplate is used.


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

 

This is the RabbitMQ configuration class and we have configured the RabbitTemplate here.


package com.springbootdev.examples.consumer.config;
import com.springbootdev.examples.consumer.model.Car;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
jsonMessageConverter.setClassMapper(classMapper());
return jsonMessageConverter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("com.springbootdev.examples.producer.model.Car", Car.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
}

 

As you are already familiar, we have done set of configurations for RabbitTemplate including the message converters. The only strange code here is the  DefaultClassMapper bean.  In order to understand the reason behind this bean, please refer the message segement.

Screen Shot 2017-11-12 at 8.22.22 PM.png

In the MessageProperties, you can see that the __TypeId__  is set to following class declaration. This __TypeId__  information is added by the message converter defined in the Producer application when the POJO is transformed to the JSON format.

 com.springbootdev.examples.producer.model.Car 

When the producer convert the POJO message to the JSON, the type information is added as a header information of the Message Properties.  When the consumer receive the message, the message converter tries to convert the JSON message into the given type in the header.  You can see that the Consumer application has a POJO class called Car. But it is in a different package. (Not in the same package as the header information declared)  So we need to do the class mapping to inform message converter that where the relevant POJO for the  message type is located.

idClassMapping.put("com.springbootdev.examples.producer.model.Car", Car.class);

Then the message converter can convert the incoming message to the relevant POJO format.

 

Now lets look at our consumer service used for receiving messages.


package com.springbootdev.examples.consumer.service;
import com.springbootdev.examples.consumer.model.Car;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
private static final String QUEUE_NAME = "all_cars_queue";
@Scheduled(fixedRate = 5000)
public void receive()
{
Object object = rabbitTemplate.receiveAndConvert(QUEUE_NAME);
if (object != null) {
Car car = (Car) object;
logger.info(" received the message [" + car.toString() + "] ");
}
}
}

 

for the demonstration purpose, we will be receiving messages from “all_cars_queue“.  we are using spring scheduled method to retrieve the message from the queue for every 5 seconds.

receiveAndConvert method of the RabbitTemplate will be used to pull/retrieve messages from queue. based on the FIFO principal, we will get the messages from the queue.

 Object object = rabbitTemplate.receiveAndConvert(QUEUE_NAME);

You can run and see the output with following command. This application is developed to be run as a command line application.

mvn spring-boot:run

 

When you run the application You will see the following output in the terminal.

Screen Shot 2017-11-12 at 8.47.56 PM.png

No we have developed the Consumer application that is used to retrieve/pull messages manually.

If you are looking for a way to listen to the queue and retrieve messages automatically ( whenever the queue get a message), please refer the part 3 of this article series.

RabbitMQ Consumer Application (Part 3) – Listening for the message queues and automatically receiving the messages when they are delivered to queues.

 

Spring AMQP / RabbitMQ Topic Exchange Example Part 1 – Producer Application

As we are already aware, there are four types of RabbitMQ message exchanges are available.  They can be listed as follows.

  1. Direct Exachange
  2. Topic Exchange
  3. Fanout Exchange
  4. Header Exchange

In this Article, we are going to look at Topic exchange.

What is Topic Exchange?

Same as Direct, but wildcards are allowed in the binding key. ‘#‘ matches zero or more dot-delimited words and ‘*‘ matches exactly one such word.

  • * (star) can substitute for exactly one word.
  • # (hash) can substitute for zero or more words.

Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange. The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.

Here is what we are going to do

 

Screen Shot 2017-11-12 at 4.56.34 AM.png

 

In this example we are trying to sort the messages and direct into the right queue based on the manufacturer of the car. we will be getting a huge amount of messages about registered cars in the city. The routing key pattern of every message is as follows.

manufacturer_name.car.type

As you can see, we have created a Topic exchange called vehicle_exchange. In addition, we have created three queues and bound them to the Topic exchange (vehicle_exchange) with following binding keys.

  • toyota_cars_queue  ( binding key – toyota.cars.* )

toyota_cars_queue is bound to the exchange with wildcard biding key toyota.cars.*  This means that the any message starting with routing key toyota.cars and ending with exactly any text should be directed to this queue (toyota_cars_queue).

  • nissan_cars_queue  ( binding key – nissan.cars.* )

In the same way, any message starting with routing key nissan.cars and ending with exactly any text should be directed to the nissan_cars_queue.

  • all_cars_queue  ( binding key – *.cars.#)

According to the binding key, the routing key should start with any text (exactly one) and followed by cars keyword. it can be ended with zero or more texts.

e.g:-  The following set of routing keys can be identified as valid routing key for this binding pattern.

nissan.cars ,  toyota,cars  , any.cars  , anything.cars.everything , anything.cars.anything.and.everything

The following table will show you a list of routing keys and their destination queue(s)

 

routing key delivered to queue(s)
nissan.cars.japan nissan_cars_queue and all_cars_queue
nissan.cars all_cars_queue
toyota.cars.japan.manufactured all_cars_queue
japan.toyota.cars No matching queue. message will be discard
import.nissan.cars.from.japan No matching queue. message will be discard
toyota.cars.manufatured toyota_cars_queue and all_cars_queue
no.latest.cars.toyota No matching queue. message will be discard

Now i believe that you have a clear understanding of how these wildcard binding keys are working with topic exchange.

 

Topic exchange : More to consider

Topic exchange is powerful and can behave like other exchanges.

When a queue is bound with “#” (hash) binding key – it will receive all the messages, regardless of the routing key – like in fanout exchange.

When special characters “*” (star) and “#” (hash) aren’t used in bindings, the topic exchange will behave just like a direct one.

 

Setting UP RabbitMQ Server

 

  • Creating the exchange

Screen Shot 2017-11-12 at 2.07.48 AM.png

 

  • Creating the toyota_cars_queue and bind it to the vehicle_exchange.

Screen Shot 2017-11-12 at 2.33.41 AM.png

 

Screen Shot 2017-11-12 at 2.35.05 AM.png

 

  • Creating the nissan_cars_queue and bind it to the vehicle_exchange.

 

Screen Shot 2017-11-12 at 2.37.40 AM.png

 

Screen Shot 2017-11-12 at 2.38.41 AM.png

 

  • Creating the all_cars_queue and binding it to the vehicle_exchange

 

Screen Shot 2017-11-12 at 2.39.41 AM.png

 

Screen Shot 2017-11-12 at 2.40.13 AM.png

 

Now we have created all  three queues and bound them to the Topic exchange that we have created. Now it is the time to develop a publisher and subscriber application for publish and receive messages from this RabbitMQ server set up.

 

Producer and Consumer Application

Before moving forward, i need to inform you that the fully source code for the Producer and Consumer application can be found at GitHub. You may clone the source code and continue with the article.

click here to download the source code of the application

 

Producer Application

Here is the project structure of the Producer Application.

Screen Shot 2017-11-12 at 2.14.26 PM.png

 

first make sure that you have defined the valid RabbitMQ server details in the application.properties file. These information will be used by the ConnectionFactory class to make a connection with RabbitMQ server when the RabbitTemplate is used.

 


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

 

Now check whether you have declared/configured the required beans to use Spring AMQP feature with RabbitMQ.


package com.springbootdev.examples.producer.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig
{
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
}

 

As i have mentioned earlier, ConnectionFactory will use the RabbitMQ server details declared in the application.properties to make a connection with RabbitMQ server when the RabbitTemplate is used.

We will be trying to send the POJO instance ( Car object) as the message body. we cannot send it as a java object and we need to convert it into some other common exchange format. We have chosen the JSON as the common exchange format and   RabbitTemplate will use the  Jackson2JsonMessageConverter as a the message converter for converting the POJO into the JSON format.

 

Now we will look at the ProducerService


package com.springbootdev.examples.producer.service;
import com.springbootdev.examples.producer.model.Car;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProducerService {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String EXCHANGE_NAME = "vehicle_exchange";
private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);
private String[] routeKeys = new String[]{
"nissan.cars.japan",
"nissan.cars",
"toyota.cars.japan.manufactured",
"japan.toyota.cars",
"import.nissan.cars.from.japan",
"toyota.cars.manufatured",
"no.latest.cars.toyota"
};
public void produce() {
for (String routingKey : routeKeys) {
logger.info(" sending the message with routing key {}", routingKey);
Car car = new Car(routingKey);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, car);
}
}
}

 

As per the above table. we are having seven routing keys and we are going to publish messages with those routing keys to the exchange called “vehicle_exchange“.  When you run the Producer application, the output will be something like below.

mvn spring-boot:run

 

Screen Shot 2017-11-12 at 6.03.53 PM.png

Now all the messages are published to the RabbitMQ server. So we can check whether the messages are delivered to the relevant queues.

Screen Shot 2017-11-12 at 6.05.41 PM.png

Now we are done with the Producer application for publishing messages for RabbitMQ Topic exchange.  Fully source code (both producer and consumer) of this article can be found at GitHub.  If you need further explanation of writing RabbitMQ consumer application, it is highly recommended to look at one of the below articles.

Spring AMQP / RabbitMQ : Manually pull message(s) from RabbitMQ queue

In the previous article (click here to view the article), consumer application listen to the queue using a listener. Therefore whenever the queue receive a message, the listener who is listening to the queue will get notified. So that the Consumer application will automatically get the message with the help of the listener.

In this article,  the Consumer application will be developed in a way to pull the messages from the queue manually. that means the message(s) will be manually pulled whenever the client application needs them. (I am emphasizing it again that we are not going to use any queue Listener here)

The source code of the Producer application of the previous article (click here to go to the previous article) can be used as it is. But there will be a minor modification for the Consumer application.  The Consumer.java class should be changed as follows to manually retrieve/pull the messages from the defined queue.

 


package com.springbootdev.samples.rabbitmq.consumer.listener;
import com.springbootdev.samples.rabbitmq.consumer.model.Order;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Consumer
{
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
private static final String QUEUE_NAME = "orders_queue";
@Autowired
private RabbitTemplate rabbitTemplate;
public void pullMessage()
{
LOGGER.info("receiving the message ");
Object message = rabbitTemplate.receiveAndConvert(QUEUE_NAME);
if (message != null) {
Order order = (Order) message;
LOGGER.info(" received message [" + order.toString() + "] ");
}
}
}

view raw

Consumer.java

hosted with ❤ by GitHub

 

You can see that the Consumer does not listen to the queue and receive messages automatically. It will use the receiveAndConvert() method of the RabbitTemplate to receive messages from the queue. We can set the queue name and messages will be pulled from that queue . The receiveAndConvert()  will use the message converter set in the RabbitTemplate to convert the received message into the desired POJO (Java Object) format. It will be retrieving the messages from the Order Queue based on FIFO principle.

The complete source code of this article can be found at GitHub.

You can download the source and just build and run the application.