A Real-World Kafka Client Implementation

This post is the continuation of my previous post on building Kafka client:   https://www.jobinesh.com/2021/10/how-to-get-your-apache-kafka-client.html. In case you have not yet read it, would encourage you to take a look at it before proceeding with this post.

In this post, I will share a simple real-life Kafka client implementation.  In case you have not yet built a Kafka client of production quality, please read this article https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/. The client implementation used in this example is heavily inspired by the concepts discussed in the above-mentioned article.

Some features that you may find interesting in the Kafka client example shared along with this post are:

  • Listen to Kafka rebalancing and handle it as appropriate
  • Error handler for async Kafka message processing task
  • Rate limiting the incoming messages by waiting for the currently running Kafka record processing threads to finish.

About this example
Here is a quick overview of the core classes that you may find in the Kafka-client project:
  • ConsumerManager: The bootstrapper class for starting the multi-threaded Kafka client
  • MultithreadedConsumer: This class implements  the core multi-threaded Kafka client
  • ConsumerRecordProcessor: The class responsible for processing the message asynchronously
  • ConsumerErrorHandler: This is the error handler class for async Kafka message processing. It makes sense when you have an async job that processes messages in a different thread other than the Kafka record processor thread.  For instance, if you need to process the Kakf message by interacting with a third-party API, typically you would do that in a different thread. If the processing fails during this phase, the error handler republished the message to the same Kafka topic and it continues till that configured maximum attempt exceeds. 
  • SimpleMessage: A custom class used for shipping the Kafka message used in this example
  • SimpleMessageSerializer: Custom Kafka serialization implementation that is used for converting a SimpleMessage object into a stream of bytes that are used for transmission.
  • SimpleMessageDeserializer: Custom Kafka deserialization implementation that is used for converting bytes of arrays into the SimpleMessage data type
  • consumer.properties: This file holds all Kafka consumer properties, located inside kafka-client/src/main/resources/consumer.properties
  • producer.properties: This file holds all Kafka producer properties, located inside kafka-client/src/main/resources/producer.properties
How to run this example?

Let's see how to run the example from the following git repository: https://github.com/jobinesh/kafka-tools

Prerequisites:

Make sure you have the following installed:

  • JDK 11 or higher
  • Maven
  • Docker

Here are the steps:

  • Clone the git  repository
  • Navigate to kafka-docker directory
    • cd kafka-docker
  • Start the Kafka cluster for this demo
    • docker-compose -f docker-compose.yml up
  • Navigate to kafka-client directory
    • cd kafka-client
  • Start the Kafka Consumer client that listens on demo topic
    • mvn install exec:java -Dexec.mainClass="com.jobinesh.kafka.client.ConsumerManager"
  • Publish the message to demo topic using Kafka Producer client
    • mvn install exec:java -Dexec.mainClass="com.jobinesh.kafka.client.SimpleProducer"  -Dexec.cleanupDaemonThreads=false


Comments

  1. Thank you for sharing such a useful article. It will be useful to those who are looking for knowledge. Continue to share your knowledge with others through posts like these, and keep posting on
    Big Data Solutions 
    Advanced Data Analytics Services
    Data Modernization Solutions
    AI & ML Service Provider

    ReplyDelete
  2. I thought I had mastered everything about Kafka client, but I'll admit that there is o lot more that I have to learn. Thank you so much for writing this great post. Law Dissertation Writing Services

    ReplyDelete

Post a Comment