How to Set Up Kafka Integration Test – Grape Up

Josephine J. Romero


Do you consider device screening as not enough remedy for trying to keep the application’s dependability and steadiness? Are you scared that in some way or someplace there is a likely bug hiding in the assumption that unit checks need to deal with all situations? And also is mocking Kafka not ample for task necessities? If even just one answer is  ‘yes’, then welcome to a pleasant and quick guideline on how to set up Integration Checks for Kafka using TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open-supply Java library specialized in offering all necessary options for the integration and testing of external sources. It means that we are able to mimic an real databases, web server, or even an function bus surroundings and address that as a reliable spot to exam app functionality. All these fancy features are hooked into docker photos, described as containers. Do we need to check the databases layer with precise MongoDB? No anxieties, we have a exam container for that. We can not also ignore about UI exams – Selenium Container will do nearly anything that we really require.
In our situation, we will focus on Kafka Testcontainer.

What is Embedded Kafka?

As the name implies, we are going to offer with an in-memory Kafka occasion, ready to be made use of as a usual broker with entire operation. It permits us to do the job with producers and consumers, as normal, creating our integration assessments lightweight. 

Prior to we start

The notion for our check is uncomplicated – I would like to test Kafka buyer and producer applying two unique approaches and test how we can employ them in true situations. 

Kafka Messages are serialized using Avro schemas.

Embedded Kafka – Producer Exam

The strategy is straightforward – let’s develop a uncomplicated job with the controller, which invokes a provider approach to push a Kafka Avro serialized concept.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation(' a look at-assist:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also well worth mentioning fantastic plugin for Avro. Below plugins segment:

id 'org.springframework.boot' variation '2.6.8'
id 'io.spring.dependency-management' model '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" model "1.3."

Avro Plugin supports schema auto-producing. This is a must-have.

Website link to plugin:

Now let’s define the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "style": "document",
  "name": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be centered only on sending messages to Kafka working with a template, practically nothing remarkable about that section. Principal operation can be accomplished just working with this line:

ListenableFuture> long term = this.kafkaTemplate.ship("register-request", kafkaMessage)

We just cannot neglect about examination attributes:

    permit-bean-definition-overriding: real
      group-id: team_id
      car-offset-reset: earliest
      crucial-deserializer: org.apache.kafka.widespread.serialization.StringDeserializer
      worth-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
      automobile.sign up.schemas: accurate
      key-serializer: org.apache.kafka.frequent.serialization.StringSerializer
      value-serializer: com.grapeup.myawesome.myawesomeconsumer.prevalent.CustomKafkaAvroSerializer
      unique.avro.reader: real

As we see in the stated examination homes, we declare a custom deserializer/serializer for KafkaMessages. It is hugely recommended to use Kafka with Avro – do not enable JSONs preserve item construction, let’s use civilized mapper and item definition like Avro.


public course CustomKafkaAvroSerializer extends KafkaAvroSerializer
    community CustomKafkaAvroSerializer()
        super.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient customer)
        super(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient customer, Map props)
        super(new MockSchemaRegistryClient(), props)


general public class CustomKafkaAvroSerializer extends KafkaAvroSerializer
    community CustomKafkaAvroSerializer()
        super.schemaRegistry = new MockSchemaRegistryClient()

    general public CustomKafkaAvroSerializer(SchemaRegistryClient consumer)
        tremendous(new MockSchemaRegistryClient())

    community CustomKafkaAvroSerializer(SchemaRegistryClient client, Map props)
        tremendous(new MockSchemaRegistryClient(), props)

And we have almost everything to get started crafting our exam.

@TestInstance(TestInstance.Lifecycle.For every_Course)
@ActiveProfiles("take a look at")
@EmbeddedKafka(partitions = 1, subject areas = "register-request")
class ProducerControllerTest {

All we want to do is include @EmbeddedKafka annotation with stated topics and partitions. Software Context will boot Kafka Broker with supplied configuration just like that. Continue to keep in intellect that @TestInstance should be utilised with special thought. Lifecycle.For each_Course will stay away from building the identical objects/context for every test process. Worthy of checking if checks are far too time-consuming.

Client consumerServiceTest
void Setup()
DefaultKafkaConsumerFactory purchaser = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = purchaser.createConsumer()

Right here we can declare the exam consumer, centered on the Avro schema return type. All Kafka qualities are now delivered in the .yml file. That shopper will be applied as a verify if the producer in fact pushed a concept.

In this article is the actual examination approach:

void whenValidInput_therReturns200() throws Exception
        RegisterRequestDto ask for = RegisterRequestDto.builder()
                .deal with("tempAddress")

                      .content material(objectMapper.writeValueAsBytes(request)))

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Matter_Name)

        RegisterRequest valueReceived = consumedRegisterRequest.worth()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

1st of all, we use MockMvc to carry out an motion on our endpoint. That endpoint employs ProducerService to push messages to Kafka. KafkaConsumer is used to confirm if the producer labored as envisioned. And that’s it – we have a fully doing the job exam with embedded Kafka.

Take a look at Containers – Client Exam

TestContainers are nothing at all else like independent docker pictures completely ready for being dockerized. The subsequent take a look at circumstance will be improved by a MongoDB impression. Why not hold our data in the databases appropriate just after anything occurred in Kafka circulation?

Dependencies are not a lot diverse than in the preceding instance. The next techniques are needed for test containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

established('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s concentrate now on the Purchaser section. The examination situation will be simple – one buyer provider will be accountable for having the Kafka information and storing the parsed payload in the MongoDB collection. All that we need to have to know about KafkaListeners, for now, is that annotation:

@KafkaListener(matters = "register-request")

By the features of the annotation processor, KafkaListenerContainerFactory will be accountable to create a listener on our method. From this minute our method will react to any upcoming Kafka concept with the talked about matter.

Avro serializer and deserializer configs are the same as in the former exam.

Concerning TestContainer, we need to start off with the adhering to annotations:

general public course AbstractIntegrationTest {

All through startup, all configured TestContainers modules will be activated. It means that we will get entry to the entire working surroundings of the selected resource. As case in point:

personal KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

community static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a result of booting the take a look at, we can assume two docker containers to start out with the provided configuration.

What is genuinely crucial for the mongo container – it provides us full access to the database making use of just a straightforward connection uri. With this kind of a attribute, we are able to just take a seem what is the latest point out in our collections, even for the duration of debug method and prepared breakpoints.
Consider a glance also at the Ryuk container – it works like overwatch and checks if our containers have started properly.

And in this article is the previous element of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry)
   registry.include("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.insert("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.include("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.incorporate("", mongoDBContainer::getReplicaSetUrl)

   kafkaContainer.start off()
   mongoDBContainer.start off()

   mongoDBContainer.waitingFor(Hold out.forListeningPort()

public void beforeTest()

           messageListenerContainer ->
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown()

DynamicPropertySource offers us the alternative to established all wanted setting variables for the duration of the take a look at lifecycle. Strongly necessary for any config applications for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for every single listener to get envisioned partitions all through container startup.

And the previous element of the Kafka examination containers journey – the most important entire body of the check:

community void containerStartsAndPublicPortIsAvailable() throws Exception
   writeToTopic("register-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").develop())

   //Wait for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().size())

personal KafkaProducer createProducer()
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

private void writeToTopic(String topicName, RegisterRequest... registerRequests)

   check out (KafkaProducer producer = createProducer())
               .forEach(registerRequest ->
                           ProducerRecord report = new ProducerRecord<>(topicName, registerRequest)


The personalized producer is responsible for creating our concept to KafkaBroker. Also, it is advisable to give some time for buyers to deal with messages effectively. As we see, the concept was not just consumed by the listener, but also saved in the MongoDB collection.


As we can see, current solutions for integration tests are rather simple to employ and preserve in jobs. There is no level in holding just device exams and counting on all traces covered as a sign of code/logic quality. Now the issue is, need to we use an Embedded alternative or TestContainers? I recommend to start with of all concentrating on the word “Embedded”. As a great integration exam, we want to get an just about suitable copy of the creation surroundings with all properties/features integrated. In-memory alternatives are superior, but mainly, not plenty of for large business enterprise jobs. Absolutely, the gain of Embedded companies is the uncomplicated way to apply this sort of tests and preserve configuration, just when just about anything comes about in memory.
TestContainers at the initial sight may search like overkill, but they give us the most essential element, which is a different natural environment. We really don’t have to even rely on present docker visuals – if we want we can use personalized ones. This is a large improvement for prospective examination situations.
What about Jenkins? There is no motive to be worried also to use TestContainers in Jenkins. I firmly advise examining TestContainers documentation on how simply we can set up the configuration for Jenkins brokers.
To sum up – if there is no blocker or any unwanted problem for making use of TestContainers, then never wait. It is often superior to hold all expert services managed and secured with integration check contracts.


Source link

Next Post

Social media and misinformation: How can we use these platforms for good?

[ad_1] Social media can get a actually negative rap, with misinformation and loathe speech at all-time highs. On the other hand, social media can also be made use of for great. In purchase to fight misinformation, influencers, corporations and organisations will need to harness the optimistic abilities of these platforms. […]