A Real-World Kafka Client Implementation

This post is the continuation of my previous post on building Kafka client:   https://www.jobinesh.com/2021/10/how-to-get-your-apache-kafka-client.html. In case you have not yet read it, would encourage you to take a look at it before proceeding with this post.

In this post, I will share a simple real-life Kafka client implementation.  In case you have not yet built a Kafka client of production quality, please read this article https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/. The client implementation used in this example is heavily inspired by the concepts discussed in the above-mentioned article.

Some features that you may find interesting in the Kafka client example shared along with this post are:

  • Listen to Kafka rebalancing and handle it as appropriate
  • Error handler for async Kafka message processing task
  • Rate limiting the incoming messages by waiting for the currently running Kafka record processing threads to finish.

About this example
Here is a quick overview of the core classes that you may find in the Kafka-client project:
  • ConsumerManager: The bootstrapper class for starting the multi-threaded Kafka client
  • MultithreadedConsumer: This class implements  the core multi-threaded Kafka client
  • ConsumerRecordProcessor: The class responsible for processing the message asynchronously
  • ConsumerErrorHandler: This is the error handler class for async Kafka message processing. It makes sense when you have an async job that processes messages in a different thread other than the Kafka record processor thread.  For instance, if you need to process the Kakf message by interacting with a third-party API, typically you would do that in a different thread. If the processing fails during this phase, the error handler republished the message to the same Kafka topic and it continues till that configured maximum attempt exceeds. 
  • SimpleMessage: A custom class used for shipping the Kafka message used in this example
  • SimpleMessageSerializer: Custom Kafka serialization implementation that is used for converting a SimpleMessage object into a stream of bytes that are used for transmission.
  • SimpleMessageDeserializer: Custom Kafka deserialization implementation that is used for converting bytes of arrays into the SimpleMessage data type
  • consumer.properties: This file holds all Kafka consumer properties, located inside kafka-client/src/main/resources/consumer.properties
  • producer.properties: This file holds all Kafka producer properties, located inside kafka-client/src/main/resources/producer.properties
How to run this example?

Let's see how to run the example from the following git repository: https://github.com/jobinesh/kafka-tools


Make sure you have the following installed:

  • JDK 11 or higher
  • Maven
  • Docker

Here are the steps:

  • Clone the git  repository
  • Navigate to kafka-docker directory
    • cd kafka-docker
  • Start the Kafka cluster for this demo
    • docker-compose -f docker-compose.yml up
  • Navigate to kafka-client directory
    • cd kafka-client
  • Start the Kafka Consumer client that listens on demo topic
    • mvn install exec:java -Dexec.mainClass="com.jobinesh.kafka.client.ConsumerManager"
  • Publish the message to demo topic using Kafka Producer client
    • mvn install exec:java -Dexec.mainClass="com.jobinesh.kafka.client.SimpleProducer"  -Dexec.cleanupDaemonThreads=false