Abstraction des brokers de messages avec Spring Cloud Stream

Mise en place de Spring cloud stream

Introduction

La documentation de Spring commence par :

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

En français, ça nous donne la définition suivante :

Spring Cloud Stream est un framework qui aide à la création de micro-services event-driven hautement scalables connectés à des systèmes de messages partagés.

C’est bien joli tout ça, mais bon, il y a beaucoup de mots qu’on ne comprend pas ! Pas de panique, cet article explique tout !

Pour que tout soit clair comme de l’eau de roche, on s’appuiera, tout au long de cet article, sur une application fictive de clinique vétérinaire que j’ai réalisée.

Les utilisateurs peuvent :

  • Se créer un compte
  • Passer des commandes
  • Prendre des rendez-vous
Figure 1 : Front de l’application
Figure 1 : Front de l’application - La clinique vétérinaire de Laulau !

Ce projet est un monolithe avec une architecture 3-tiers illustrée figure 2.

En très bref :

  1. Notre application front appelle notre API via le protocole HTTP.
  2. Une fois dans l’API, la couche Controller est sollicitée en premier. Elle appelle ensuite les Services adéquats qui exécutent le code “métier”. Si on a un besoin de persister des données, ce sont les DAO qui prennent le relais.

Figure 2 : Monolithe avec une archi 3-tiers
Figure 2 : Monolithe avec une archi 3-tiers

Qu’est ce qu’une architecture micro-service ?

Martin Fowler, un consultant britannique dans la conception logicielle la définit comme :

“Une approche qui consiste à développer une application unique sous la forme d'une suite de petits services, chacun fonctionnant dans son propre processus et communiquant à l'aide de mécanismes légers. Ces services peuvent être déployés indépendamment. Chaque service peut être écrit dans son langage de programmation et peut utiliser différentes technologies de stockage de données.”

Pour plus de détails : Microservices

Appliquons cette architecture à notre monolithe.

On crée un micro-service (MS) par domaine :

  • l’Account MS qui s’occupe de toutes les actions relatives aux comptes : création, mise à jour, suppression…
  • le Purchase MS qui gère et suit toutes les commandes
  • l’Appointment MS qui est responsable de la partie rendez-vous.

On transforme notre API en un simple point d’entrée pour le front de notre application (une petite gateway). Elle a pour but de gérer les requêtes clients et de les transmettre aux autres services.

Comment notre API transmet les informations aux services ?

Il existe plusieurs moyens de communication entre les services comme les appels HTTP, les requêtes GraphQL, l’utilisation du RCP et beaucoup d’autres !

Je vous laisse aller les explorer par vous même. Dans notre cas, nous utilisons l’event-driven.

Mais qu’est ce que l’event-driven ?

L’architecture event-driven (EDA) se base sur des systèmes de messages partagés, ou brokers de messages, pour faire communiquer ses micro-services.

L’EDA permet de déléguer les responsabilités pour créer un système scalable et faiblement couplé.

Le principe est simple : un publisher envoie un message sur le broker. Des subscribers souscrivent au broker et réceptionnent tous les messages qui leur sont adressés.

Figure 3 : Schéma simplifié d’un broker de message
Figure 3 : Schéma simplifié d’un broker de message

Qu’est ce qu’un broker de messages ?

Pour faire très simple, il faut comparer un broker de messages au service de La Poste.

Les services de notre API vont publier un message avec une RoutingKey sur un Exchange. Le broker va rediriger le message en fonction de la RoutingKey et de l’Exchange vers la bonne Queue.

Si on imagine que notre broker est La Poste :

  • Adresse = RoutingKey
  • La poste = Broker de messages
  • Exchange = Boite aux lettres de la Poste
  • Queue = Facteur + boite aux lettres personnelle
Figure 4 : Explication détaillée du broker de message avec l’exemple de La Poste
Figure 4 : Explication détaillée du broker de messages avec l’exemple de La Poste

Paul (un service de l’API) écrit l’adresse (RoutingKey) sur son enveloppe et la dépose dans une boite aux lettres précise (Exchange) de la Poste (Broker de messages). La Poste récupère tous les messages de la boîte et les redirige vers le bon facteur en fonction de l’adresse (RoutingKey). Le facteur dépose ensuite le message dans la bonne boite aux lettres (Queue). Julie (le micro-service concerné par le message) récupère (souscrit) le message dans sa boite aux lettres.

Appliquons un broker de messages à notre application.

Si on reprend un flux qui part du front, le début est commun à l’architecture n-tiers, l’application front appelle l’API via le protocole HTTP et les controllers redirigent le flux vers les bons services.

Cette fois, les services notifient le micro-service impacté via un envoi de messages sur un broker de messages. A la réception du message, le MS effectue sa logique métier.

Figure 5 : Schéma simplifié de l’intégration d’un broker de messages à notre application

Si on prend RabbitMQ comme broker de messages pour notre application, on en revient au schéma figure 6.

Figure 6 : Schéma de l’architecture event-driven de notre projet
Figure 6 : Schéma de l’architecture event-driven de notre projet

Vous avez maintenant la nouvelle architecture event-driven de notre projet.

Comment ça se passe niveau code pour intégrer le broker à notre architecture ?

Spring Boot permet d’intégrer facilement l’envoi de messages à des brokers notamment via le Starter AMQP. Cependant, le starter couple fortement le code au broker. Pour l’utiliser avec RabbitMQ, il faut manier des classes comme RabbitMessage ou RabbitTemplate. Si demain une migration de broker est décidée, il faut changer toutes les configurations et les appels au broker.

Et c’est ici que Spring Cloud Stream entre en jeu et permet de résoudre ce problème. Il nous aide à abstraire le broker de messages utilisé dans notre code. On remplace toutes les implémentations liées à RabbitMQ par des implémentations génériques. Un peu comme si on postait un colis sans savoir si c’est via La Poste ou Mondial Relay !

Et si on entrait dans le vif du sujet ?

Les différents brokers

La dépendance Spring Cloud Stream développée par Spring supporte les brokers de messages suivants :

  • RabbitMQ
  • Apache Kafka
  • Kafka Streams
  • Amazon Kinesis

Il est possible d’utiliser Spring Cloud Stream avec d’autres brokers. Ces dépendances ne sont pas maintenues par Spring mais par les développeurs des systèmes de messages comme :

  • Google PubSub
  • Solace PubSub+
  • Azure Event Hubs
  • Azure Service Bus
  • AWS SQS
  • AWS SNS
  • Apache RocketMQ
  • et quelques autres …

Les notions à bien comprendre

Les trois notions importantes de Spring Cloud Stream :

  • Binder : le ou les brokers de messages que l’on abstrait.
    Ex : RabbitMQ, Azure ServiceBus
  • Binding : représentation de la connexion entre le micro-service et un chanel de messages qui définit la façon d’envoyer ou de recevoir les messages.
    Ex : un exchange pour Rabbit, une souscription pour Azure ServiceBus.
  • Message : structure utilisée pour les données échangées. Il s’agit de la classe org.springframework.messaging.Message.

Comment l’ajouter à son projet ?

Rien de plus simple, on ajoute dans un premier temps la dépendance suivante dans son pom.xml :

<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-stream</artifactId>
</dependency>

Pour la version, vous pouvez vous référer au Maven Repository.

Maven Repository: org.springframework.cloud » spring-cloud-stream

Il faut ensuite ajouter la dépendance du broker choisi. Pour le cas de RabbitMQ :

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Maven Repository: org.springframework.cloud » spring-cloud-stream-binder-rabbit

Pour utiliser Azure ServiceBus, il faut ajouter :

<dependency>
	<groupId>com.azure.spring</groupId>
  <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Maven Repository: com.azure.spring » spring-cloud-azure-stream-binder-servicebus

Le but ultime de Spring Cloud Stream est d’abstraire toute notion de brokers de messages dans le code métier. C’est pourquoi il faut tout mettre dans les configurations ! Comme tout module spring, on passe par le fichier application.yml ou application.properties.

Si je reprends notre exemple précédent de clinique vétérinaire, nous permettons aux utilisateurs de passer des commandes. Lorsque le client valide sa commande, notre API doit notifier le MS Purchase.

Regardons comment envoyer un message avec Spring Cloud Stream.

Figure 7 : Spring Cloud Stream lors d’un envoi de message
Figure 7 : Spring Cloud Stream lors d’un envoi de message

Envoi de message

Avec Spring Cloud Stream “tout est config” ! C’est à dire que l’essentiel se passe dans le fichier application.yml.

Pour l’implémentation, nous choisissons d’utiliser RabbitMQ. Il faut, dans un premier temps, configurer la connexion à notre broker.

Dans un second temps, on définit tout ce qui concerne les connexions sous la balise spring.cloud.stream :

  1. On précise le broker sous la balise binders
  2. On précise le nom de la fonction responsable de l’envoi de mon message. Ex : sendPurchaseToPurchaseMS sous la balise function
  3. On ajoute les définitions de nos connexions (noms d’exchange et de queue) sous la balise bindings. Pour cela :
    1. On reprend le nom de la fonction sendPurchaseToPurchaseMS
    2. On lui ajoute le suffixe out car on veux envoyer un message
    3. On lui rajoute le suffixe 0 car on ne veux gérer qu’un flux de messages à la fois
    4. Derrière destination, on met le nom de l’exchange Rabbit. Derrière binder on donne le nom du broker définit précédemment
  4. Pour préciser la RoutingKey de l’exchange, on doit ajouter une balise rabbit qui définit la RoutingKey du producer pour la fonction sendPurchaseToPurchaseMS

Tadaaam le plus beau des yaml :

application.yml

# ex de config rabbit
spring:
  rabbitmq:
    addresses: localhost:5672
    username: guest
    password: guest
    listener:
      simple:
        default-requeue-rejected: false
        concurrency: 3
        max-concurrency: 7

  cloud:
    stream:
			# etape 1
      binders:
        mysuperbinderrabbit:
          type: rabbit
          default-candidate: true
			# etape 2
      function:
        definition: sendPurchaseToPurchaseMS
      bindings:
				# etape 3
        sendPurchaseToPurchaseMS-out-0:
          destination: purchasesExchange
          binder: mysuperbinderrabbit
			# etape 4
      rabbit:
        bindings:
          sendPurchaseToPurchaseMS-out-0:
            producer:
              routing-key-expression: '''api.SEND.PURCHASE'''

Figure 8 : Les conventions de nommage pour les bindings
Figure 8 : Les conventions de nommage pour les bindings

Un petit conseil : il est judicieux de séparer le fichier de configuration en deux parties :

  1. la connexion au broker
  2. les définitions des bindings
Figure 9 : Séparation du fichier de configuration en deux fichiers : un pour les connexions, un pour les bindings
Figure 9 : Séparation du fichier de configuration en deux fichiers : un pour les connexions, un pour les bindings

Et maintenant, on peut commencer à coder notre service. Il faut seulement implémenter un Bean Spring qui représente notre fonction sendPurchaseToPurchaseMS définie dans notre bindings-config.yml.

Notre méthode est un Supplier, opérateur qui ne prend aucune valeur en entrée mais qui retourne une valeur en sortie, puisqu’on cherche à envoyer un message.

	@Bean
	public Supplier<Message<PurchasePayload>> sendPurchaseToPurchaseMS() {
		return () -> {
			List<PurchaseItems> orderedItems = List.of(
					PurchaseItems.builder()
							.id("Catfood123")
							.name("Dry kitten food")
							.quantity(1)
							.build(),
					PurchaseItems.builder()
							.id("Toy123")
							.name("Cat tree")
							.quantity(1)
							.build());
			PurchasePayload purchasePayload = PurchasePayload.builder()
					.customerId("123456789")
					.purchaseId("abcdef")
					.orders(orderedItems)
					.build();
			log.info("Sending message");
			return MessageBuilder.withPayload(purchasePayload).build();
		};
	}

Et voilà, rien de plus à faire, nos messages s’envoient. La preuve en image :

Figure 10 : Capture de l’écran de monitoring de la queue api.send.purchase
Figure 10 : Capture de l’écran de monitoring de la queue api.send.purchase
Figure 10 : Logs de l’api
Figure 11 : Logs de l’api

Oups, je crois qu’on a un petit souci : ce qu’on a codé ici, envoie toutes les secondes le même message.

Dans notre cas, lorsqu’on reçoit une demande de commande depuis le front, on veut que notre API crée un message à partir des données reçues et qu’elle l’envoie une seule fois.

Seulement, on est un peu coincé parce qu’on n’a aucun moyen d’appeler notre @Bean Supplier explicitement…

Alors comment faire un envoi unique de message sans passer par une interface fonctionnelle ?

Spring Cloud Stream a été crée dans une optique de gestion de flux avec des bindings implicites permis par la programmation fonctionnelle.

Il est cependant possible de créer des bindings explicites permettant des envois directs de messages, il faut alors utiliser StreamBridge. On essaie ?

Dans notre API, on crée un Endpoint : POST /purchase qui prend en entrée la commande du client qu’on publie directement sur RabbitMQ pour l’envoyer au PurchaseMS.

Pour envoyer le message avec StreamBridge, il suffit de l’injecter dans notre controller et d’utiliser la méthode send qui prend en paramètre un binding et un body.

On lui donne notre binding “sendPurchaseToPurchaseMS-out-0” et la commande reçue depuis notre front.

@RestController
@Log4j2
public class PurchaseController {
	
    @Autowired
    private StreamBridge streamBridge;

    @PostMapping("/purchase")
    public void getPurchaseFromFrontAndSendItToMS(@RequestBody PurchasePayload purchase) {
			log.info("Purchase request received");
	    streamBridge.send("sendPurchaseToPurchaseMS-out-0", purchase);
			log.info("Purchase sent"); 
    }

}

Si on appelle notre controller via Postman, on a bien un envoi unique du message !

Figure 11 : Payload envoyé sur le PurchaseController
Figure 12 : Payload envoyé sur le PurchaseController
Figure 12 : Logs de l’api
Figure 13 : Logs de l’api
Figure 13 : Capture de l’écran de monitoring de la queue api.send.purchase
Figure 14 : Capture de l’écran de monitoring de la queue api.send.purchase

Super, mission accomplie, on a transmis la demande de commande à notre MS !

Mais lui de son coté, comment il écoute ?

Figure 14 : Spring Cloud Stream lors d’une réception de message
Figure 15 : Spring Cloud Stream lors d’une réception de messages

Réception de messages

Essayons de logger tous les messages reçus par notre MS.

On retourne dans la partie configuration, le fichier bindings-config.yml du MS Purchase. Cette fois ci, dans les bindings, on utilise le in car on reçoit et on ajoute un group qui correspond à notre queue RabbitMQ.

Attention, par défaut Spring génère les queues automatiquement, il faut rajouter une balise rabbit avec les paramètres du consumer pour lui indiquer de ne pas les ajouter.

binding-config.yml

spring:
  cloud:
    stream:
      binders:
        mysuperbinderrabbit:
          type: rabbit
          default-candidate: true
      # notre fonction n'est plus la même
      function:
        definition: receivePurchaseFromApi
      bindings:
        # définition des bindings, cette fois on utilise le in car on reçoit un message
        receivePurchaseFromApi-in-0:
          destination: purchasesExchange
          group: purchasesFromApiQueue
          binder: mysuperbinderrabbit
				# à ajouter si on veut utiliser ses propres queues.
			rabbit:
        bindings:
          receivePurchaseFromApi-in-0:
            consumer:
              bindQueue: false
              declareExchange: false
              queueNameGroupOnly: true

Pour la partie code, on utilise un Consumer car on reçoit un message. Il s’agit d’un opérateur qui prend une valeur en entrée mais qui ne retourne rien.

@Bean
public Consumer<Message<PurchasePayload>> receivePurchaseFromApi() {
    return message -> {
        log.info("New message received: '{}'", message.getPayload());
        log.info("The headers of the message: '{}'", message.getHeaders());
    };
}

Comme une image vaut mille mot, je vous montre que je ne raconte pas de salades.

En publiant mon message :

Figure 15 : Envoi de message sur la queue purchasesFromApiQueue
Figure 16 : Envoi de message sur la queue purchasesFromApiQueue
Figure 16 : Logs du micro-service Purchase
Figure 17 : Logs du micro-service Purchase

Il est bien reçu !

Mais… Nous ne sommes pas fabricant de croquettes ! A chaque fois qu’un client va passer une commande, il va falloir qu’on la transmette à notre partenaire SuperCroquettes&Co.

A chaque fois qu’il reçoit un message de notre API, notre micro-service Purchase doit d’abord transformer le message, par exemple ne garder que les items de type DRY_FOOD, et le renvoyer à notre partenaire.

Nous allons voir comment envoyer un message au moment de la réception d’un autre message.

Figure 17 : Spring Cloud Stream pour un envoi au moment d’une réception de message
Figure 18 : Spring Cloud Stream pour un envoi au moment d’une réception de messages

Envoi au moment de la réception

Comme pour les cas précédents, on va définir notre fonction dans la balise function. Comme on envoie et on reçoit via la même fonction, dans les bindings, on va utiliser deux fois le même functionName que l’on va suffixer de in pour préciser les connexions de la réception et de out pour préciser les connexions de l’envoi.

Dans notre cas, nous recevons le message sur l’exchange interne purchasesExchange et nous renvoyons un message sur l’exchange purchasesExchangeWithCroquettesAndCo.

binding-config.yml

spring:
  cloud:
    stream:
      binders:
        mysuperbinderrabbit:
          type: rabbit
          default-candidate: true
			# définition de notre fonction 
      function:
        definition: receiveFromMsAndSendToSuperCroquettesAndCo
			# on reçoit et on envoie via la même fonction. On la bind 2 fois, une fois avec in
				# et une fois avec out
      bindings:
        receiveFromMsAndSendToSuperCroquettesAndCo-in-0:
          destination: purchasesExchange
          group: purchasesFromApiQueue
          binder: mysuperbinderrabbit
        receiveFromMsAndSendToSuperCroquettesAndCo-out-0:
          destination: purchasesExchangeWithCroquettesAndCo
          binder: mysuperbinderrabbit

On n’oublie pas de préciser la RoutingKey du producer et la non création des queues du consumer.

binding-config.yml

spring: 
	cloud: 
		stream: 
			rabbit:
        bindings:
          receiveFromMsAndSendToSuperCroquettesAndCo-out-0:
            producer:
              routing-key-expression: '''PURCHASE.REQUESTS'''
					receiveFromMsAndSendToSuperCroquettesAndCo-in-0:
						consumer:
              bindQueue: false
              declareExchange: false
              queueNameGroupOnly: true

Au niveau du code, on utilise une Function, opérateur qui prend une valeur en entrée et qui retourne une autre valeur

@Bean
  public Function<Message<PurchasePayload>, Message<PurchaseForSuperCroquettesPayload>> receiveFromMsAndSendToSuperCroquetteAndCo() {
      return message -> {
          log.info("New message received: '{}'", message.getPayload());
          PurchasePayload messagePayloadIn = message.getPayload();
          List<PurchaseItems> items = messagePayloadIn.getOrders();

					// on ne garde que les items qui sont de la nourriture  
          List<DryFood> itemsToSendToSuperCroquettes = items.stream()
                  .filter(order -> order.getType().equals(ItemType.DRY_FOOD))
                  .map(order -> DryFood.builder()
                          .id(order.getId())
                          .quantity(order.getQuantity())
                          .build()).toList();

          PurchaseForSuperCroquettesPayload messagePayloadOut =
                  PurchaseForSuperCroquettesPayload.builder()
                  .purchaseId(messagePayloadIn.getPurchaseId())
                  .dryFoods(itemsToSendToSuperCroquettes)
                  .build();

          Message<PurchaseForSuperCroquettesPayload> messageOut =
                  MessageBuilder.withPayload(messagePayloadOut).build();
          log.info("Message to send: '{}'", messageOut);

          return messageOut;
      };
  }
Figure 18 : Envoi de message sur la queue purchasesFromApiQueue
Figure 19 : Envoi de message sur la queue purchasesFromApiQueue
Figure 19 : Logs du micro-service Purchase
Figure 20 : Logs du micro-service Purchase
Figure 20 : Capture de l’écran de monitoring de la queue purchaseCroquetteAndCoQueue
Figure 21 : Capture de l’écran de monitoring de la queue purchaseCroquetteAndCoQueue

Et si on en a marre de Rabbit ?

Et oui ! N’oublions pas que l’atout majeur de Spring Cloud Stream c’est d’abstraire le broker de messages utilisé.

Si demain, notre partenaire décide de changer de broker, pour passer de RabbitMQ à ServiceBus de Azure par exemple, il suffira de modifier la config.

Notre partie code Java restera la même. On n’oublie pas d’ajouter la dépendance à ServiceBus dans le pom ! La problématique initiale est donc résolue !

spring: 	
	cloud:
    stream:
      binders:
        mysuperbinderrabbit:
          type: rabbit
          default-candidate: true
			# on ajoute le nouveau binder
			mysuperbinderservicebus:
          type: servicebus
          default-candidate: false
			# définition de notre fonction 
      function:
        definition: receiveFromMsAndSendToSuperCroquettesAndCo
			# on reçoit et on envoie via la même fonction. On la bind 2 fois, une fois avec in
				# et une fois avec out
      bindings:
        receiveFromMsAndSendToSuperCroquettesAndCo-in-0:
          destination: purchasesExchange
          group: purchasesFromApiQueue
          binder: mysuperbinderrabbit
        receiveFromMsAndSendToSuperCroquettesAndCo-out-0:
          destination: purchasesExchangeWithCroquettesAndCo
					# on précise le binder
          binder: mysuperbinderservicebus

Conclusion

Avantages :

Spring Cloud Stream peut être très intéressant lorsqu’il sert de passerelle avec des brokers extérieurs à notre système. On peut ainsi recevoir la donnée, la transformer et la réinjecter dans notre système. Si les brokers externes changent, il suffit de modifier nos configurations, la partie Java n’a pas besoin d’être modifiée.

Inconvénients :

La phase d’apprentissage pour configurer les bindings est intense. Une grosse partie du code se fait dans les fichiers de configs ce qui sort un peu de l’ordinaire. Il faut aussi être à l’aise avec la programmation fonctionnelle, qui est moins courante.

Mes trois sous sur la mise en place Spring Cloud Stream ?

La mise en place de Spring Cloud Stream dans ma mission est arrivée lors du besoin d’injection dans notre système des données provenant de l’extérieur. Un partenaire nous envoie des messages sur un Azure ServiceBus. Nous avons crée un micro-service qui récupère le message ServiceBus, valide le payload et le publie dans notre système de messages interne (RabbitMQ) sur un exchange particulier. D’autres micro-services de notre SI écoutent cet exchange et récupèrent les informations qui les intéressent. La mise en place de ce micro-service m’a plongée dans ce module de Spring. C’était un sujet challengeant, d’autant plus que la documentation est peu fournie https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/ (surtout celle concernant Azure ServiceBus)

Cependant, j’ai adoré l’expérience de recherche et de découverte d’une nouvelle techno et je me suis dit que le partage de mon expérience pouvait donner envie à d’autres d’essayer Spring Cloud Stream !

Attention toutefois, ne l’utilisez pas pour toutes vos interactions avec un broker de messages. Sa force réside dans l’abstraction des brokers de messages et permet d’avoir un code découplé du système de messaging. Notre cas métier était parfait pour tester Spring Cloud Stream.

Lorsqu’on se penche un peu plus sur les brokers de messages, il existe tout un tas d’autres challenges notamment autour de la gestion des transactions distribuées. Mais aussi que se passe-t-il si l’un des messages n’a pas pu être lu ? Comment gérer un rollback ? Qui sait ? Ce sera peut être le sujet de mon prochain article.

J’espère que cet article vous a plu, et qu’il vous donnera envie d’implémenter Spring Cloud Stream dans vos projets, si vous en avez le besoin.

N'hésitez pas à me contacter si vous avez des questions et des remarques 🙂

Par Laurine Lenet

Admin Takiblog

Admin Takiblog