Dans cet article, je vous propose de vous présenter Spring Cloud Stream, un projet de Spring Cloud qui permet de simplifier la mise en place de flux de données entre applications.

Principe de fonctionnement

Spring cloud stream se base sur un principe de paire binder / bindings permettant de créer un channel spring integration sur lequel transite les messages sur lesquels on peut appliquer des traitements.

Overview

Spring cloud stream s’inscrit comme une alternative à Kafka Stream avec une approche plus abstraite car non liée à Kafka. De plus, ce module de spring cloud peut tout à fait être utilisées pour des applications ne faisant que produire ou consommer des messages.

C’est notamment sur ce genre de usecase que je l’ai utilisé pour des projets chez des clients ; car il permet de ne pas se focaliser sur la technologie de messaging utilisée et de se concentrer sur le traitement des messages.

Exemple d’utilisation

Création d’un projet

Nous allons nous baser sur le projet très simple suivant: “un émetteur envoie un message à un destinataire”.

Sur ce projet, on écrira un traitement qui filtre les messages n’ayant ni destinataire ni contenu. Cela nous permettra d’exploiter les 3 cas d’utilisation de Spring Cloud Stream :

  • publication de message,
  • consommation de message,
  • traitement de message.

Le projet

Vous pouvez retrouver le projet exemple sur github.

Les dépendances

Pour spring cloud stream, nous avons


<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>commons-logging</artifactId>
                <groupId>commons-logging</groupId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

Nous avons ici les dépendances pour les 2 binders les plus utilisés : Kafka et RabbitMQ.

NOTE : nous avons exclus commons-logging car il empêche la compilation native du projet avec GraalVM.

La configuration

Pour configurer Spring Cloud Stream, nous avons paramétré les propriétés suivantes dans le fichier application.yml :

schema:
  registry:
    url: http://localhost:8081

spring:
  application:
    name: introduction-spring-cloud-stream

  rabbitmq:
    addresses: localhost:5672
    username: guest
    password: guest
  cloud:
    function:
      definition: processor;consumer
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          configuration:
            security:
              protocol: PLAINTEXT
          producer-properties:
            schema.registry.url: ${schema.registry.url}
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          consumer-properties:
            schema.registry.url: ${schema.registry.url}
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
      output-bindings: producer
      bindings:
        processor-in-0:
          destination: my-process-topic
          group: consumer
          contentType: application/*+avro
          consumer:
            use-native-decoding: true
        processor-out-0:
          destination: my-result-topic
          group: consumer
          content-type: application/json
          binder: rabbit
        consumer-in-0:
          destination: my-result-topic
          group: consumer
          contentType: application/json
          binder: rabbit
        producer:
          destination: my-process-topic
          contentType: application/*+avro
          producer:
            use-native-encoding: true

La configuration permet de définir le pipeline de traitement suivant :

  • processor : le traitement qui filtre les messages
  • producer : l’émetteur de message
  • consumer : le consommateur de message

Pipeline

Cette exemple établit un endpoint REST qui permet d’envoyer un message à un récepteur de message. Le message est ensuite traité par le traitement processor qui filtre les messages n’ayant ni destinataire ni contenu. Enfin le résultat du traitement est envoyé à un consommateur qui l’affiche dans la console.

Ce qui est intéressant dans cette configuration, c’est que le traitement processor est totalement indépendant du binder utilisé. Il est donc possible de changer de système de messaging sans avoir à modifier le traitement.

On ne s’intéresse donc plus qu’au traitement des messages et non plus à la technologie de messaging.

Le code

D’un point de vue code, on va déclarer plusieurs composants.

Le traitement du message

On définit un composant MessageProcessor qui va traiter le message et le filtrer si besoin:

class MessageProcessor {
    fun process(message: Message): Result? {
        LOGGER.info("Processing messages: {}", message)
        val hasNoContent = message.content.isNullOrBlank()
        if (hasNoContent) {
            LOGGER.debug("Message {} is filter out because it does not have any content", message.id)
            return null
        }
        val hasNoReceiver = message.receiver.isNullOrBlank()
        if (hasNoReceiver) {
            LOGGER.debug("Message {} is filter out because it does not have any receiver", message.id)
            return null
        }
        return Result(
            id = message.id,
            timestamp = message.timestamp,
            content = message.content,
            sender = message.sender,
            receiver = message.receiver
        )
    }

    private companion object {
        private val LOGGER = org.slf4j.LoggerFactory.getLogger(MessageProcessor::class.java)
    }
}
L’émetteur de message

On définit un composant MessageProducer qui va envoyer un message sur le channel nommé producer :

class MessageProducer(private val streamOperations: StreamOperations) {
    fun produce(message: Message) {
        LOGGER.info("Producing message: {}", message)
        streamOperations.send("producer", message)
    }

    private companion object {
        private val LOGGER = org.slf4j.LoggerFactory.getLogger(MessageProducer::class.java)
    }
}
Le consommateur de message

On définit un composant ResultConsumer qui va consommer le message résultant du traitement sur le channel nommé consumer-in-0 :

class ResultConsumer(private val store: Queue<Result>) {
    fun consume(result: Result) {
        LOGGER.info("Consuming result: {}", result)
        store.add(result)
    }

    private companion object {
        private val LOGGER = getLogger(ResultConsumer::class.java)
    }
}
Le controller REST

On définit un composant MessageController qui va exposer un endpoint REST permettant d’envoyer un message à un récepteur de message :

@RestController
class MessageController(
    private val idGenerator: Supplier<String>,
    private val messageProducer: MessageProducer,
    private val clock: Clock
) {

    @ResponseStatus(code = NO_CONTENT)
    @PostMapping("/{sender}/message")
    fun postMessage(
        @PathVariable sender: String,
        @Valid @RequestBody messageRequestBody: MessageRequestBody
    ) {
        LOGGER.info("Requesting message: {}", messageRequestBody)
        val message = Message.newBuilder()
            .setId(idGenerator.get())
            .setTimestamp(clock.instant().toEpochMilli())
            .setContent(messageRequestBody.content)
            .setSender(sender)
            .setReceiver(messageRequestBody.receiver)
            .build()
        LOGGER.info("Sending message: {}", message)
        messageProducer.produce(message)
    }


    private companion object {
        private val LOGGER = getLogger(MessageController::class.java)
    }

}

Lancement du projet

Pour lancer le projet, il faut lancer le serveur de schema registry :

./mvnw spring-boot:run

A partir de là, on peut envoyer un message à un récepteur de message en appelant l’ endpoint REST suivant :

curl -X POST \
  http://localhost:8080/sender1/message \
  -H 'Content-Type: application/json' \
  -d '{
    "content": "Hello World",
    "receiver": "receiver1"
}'

Et voilà, vous venez de développer votre première petite application spring cloud stream. 🚀

Les tests

Un avantage de spring cloud stream est qu’il abstrait le système de messaging utilisé. Cela permet donc à spring cloud stream d’introduire un binder in memory qui permet de tester facilement les composants.

Dépendances

Pour cela, il suffit d’ajouter la dépendance suivante :


<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <scope>test</scope>
</dependency>
Configuration

Pour dans la partie testing, on reprendra la configuration de notre application tout en prenant soin de ne pas placer le binder Kafka ou RabbitMQ :

spring:
  application:
    name: introduction-spring-cloud-stream

  cloud:
    function:
      definition: processor;consumer
    stream:
      output-bindings: producer
      bindings:
        processor-in-0:
          destination: my-process-topic
          group: consumer
          contentType: application/*+avro
          consumer:
            use-native-decoding: true
        processor-out-0:
          destination: my-result-topic
          group: consumer
          content-type: application/json
        consumer-in-0:
          destination: my-result-topic
          group: consumer
          contentType: application/json
        producer:
          destination: my-process-topic
          contentType: application/*+avro
          producer:
            use-native-encoding: true
Le code de test

On peut ensuite tester notre application en utilisant le binder in memory :

@AutoConfigureMockMvc
@SpringBootTest
class IntroductionSpringCloudStreamApplicationTests {

    @Autowired
    private lateinit var mockMvc: MockMvc

    @Autowired
    private lateinit var store: Queue<Result>

    @Test
    fun `should consume message with content from sender to receiver`() {
        this.mockMvc.perform {
            post("/sender/message")
                .contentType("application/json")
                .content(
                    """
    {
      "content": "Hello World!",
      "receiver": "receiver"
    }
                    """.trimIndent()
                )
                .buildRequest(it)
        }.andExpect { status().isOk }

        val result = this.store.peek()
        assertThat(result).isNotNull
        assertThat(result.id).isNotNull
        assertThat(result.content).isEqualTo("Hello World!")
    }
}

Compilation native

Une petite particularité de Spring Cloud Stream est qu’il est possible de compiler le projet en binaire natif avec GraalVM.

Pour cela, il faut ajouter la dépendance suivante :

<plugins>
    <plugin>
        <groupId>org.graalvm.buildtools</groupId>
        <artifactId>native-maven-plugin</artifactId>
    </plugin>
</plugins>

Si vous êtes sous MacOS comme moi et que vous utilisez les processors ARM (Mx) alors il faut changer le builder buildpacks pour utiliser le builder dashaun/builder:tiny :

<plugins>
    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <configuration>
            <image>
                <builder>dashaun/builder:tiny</builder>
            </image>
        </configuration>
    </plugin>
</plugins>

Une fois tout cela effectué, vous pouvez construire votre image native avec la commande suivante :

./mvnw spring-boot:build-image

Ou alors compiler votre projet pour obtenir un livrable executable avec la commande suivante :

./mvnw -Pnative native:compile

Une fois le livrable généré, vous pouvez lancer votre application avec la commande suivante :

./target/introduction-spring-cloud-stream

Malheureusement dans mon cas, j’ai eu une erreur lors du lancement de l’application :

2024-01-07T15:35:55.791+01:00 ERROR 9016 --- [introduction-spring-cloud-stream] [           main] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (my-process-topic):
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:684) ~[introduction-spring-cloud-stream:4.1.0]

Pour l’instant, j’ai l’impression que la compilation native ne semble pas encore fonctionnelle avec spring cloud stream ; donc faites comme moi et stay tuned ! 🤞

Conclusion

Spring Cloud Stream est un projet très intéressant qui permet de simplifier la mise en place de flux de données entre applications.

Il permet de se concentrer sur le traitement des messages et non plus sur la technologie de messaging utilisée.

Un autre avantage d’utiliser spring cloud stream, c’est qu’il nous permet de très facilement tester notre système sans se préoccuper de devoir installer un système de message qui est souvent lourd à faire lorsque l’on souhaite lancer nos tests automatisés.

Resources