A Real-World Kafka Client Implementation
This post is the continuation of my previous post on building Kafka client: https://www.jobinesh.com/2021/10/how-to-get-your-apache-kafka-client.html. In case you have not yet read it, would encourage you to take a look at it before proceeding with this post.
In this post, I will share a simple real-life Kafka client implementation. In case you have not yet built a Kafka client of production quality, please read this article https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/. The client implementation used in this example is heavily inspired by the concepts discussed in the above-mentioned article.
Some features that you may find interesting in the Kafka client example shared along with this post are:
- Listen to Kafka rebalancing and handle it as appropriate
- Error handler for async Kafka message processing task
- Rate limiting the incoming messages by waiting for the currently running Kafka record processing threads to finish.
- ConsumerManager: The bootstrapper class for starting the multi-threaded Kafka client
- MultithreadedConsumer: This class implements the core multi-threaded Kafka client
- ConsumerRecordProcessor: The class responsible for processing the message asynchronously
- ConsumerErrorHandler: This is the error handler class for async Kafka message processing. It makes sense when you have an async job that processes messages in a different thread other than the Kafka record processor thread. For instance, if you need to process the Kakf message by interacting with a third-party API, typically you would do that in a different thread. If the processing fails during this phase, the error handler republished the message to the same Kafka topic and it continues till that configured maximum attempt exceeds.
- SimpleMessage: A custom class used for shipping the Kafka message used in this example
- SimpleMessageSerializer: Custom Kafka serialization implementation that is used for converting a SimpleMessage object into a stream of bytes that are used for transmission.
- SimpleMessageDeserializer: Custom Kafka deserialization implementation that is used for converting bytes of arrays into the SimpleMessage data type
- consumer.properties: This file holds all Kafka consumer properties, located inside kafka-client/src/main/resources/consumer.properties
- producer.properties: This file holds all Kafka producer properties, located inside kafka-client/src/main/resources/producer.properties
Let's see how to run the example from the following git repository: https://github.com/jobinesh/kafka-tools
Make sure you have the following installed:
- JDK 11 or higher
Here are the steps:
- Clone the git repository
- Navigate to kafka-docker directory
- cd kafka-docker
- Start the Kafka cluster for this demo
- docker-compose -f docker-compose.yml up
- Navigate to kafka-client directory
- cd kafka-client
- Start the Kafka Consumer client that listens on demo topic
- mvn install exec:java -Dexec.mainClass="com.jobinesh.kafka.client.ConsumerManager"
- Publish the message to demo topic using Kafka Producer client
- mvn install exec:java -Dexec.mainClass="com.jobinesh.kafka.client.SimpleProducer" -Dexec.cleanupDaemonThreads=false