Apache Kafka is a distributed streaming platform. It provides a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka can be used as a Messaging system like ActiveMQ or RabbitMQ. It supports fault tolerance using a replica set within the cluster. Kafka can be used for storing the data and stream processing to read the data in nearly real-time. The producers produce the message to topics. The consumer consumes the data from topics.
Kafka contains broker, topics, and replica set objects. Kafka support many clients include java, C++, Python, and more. https://cwiki.apache.org/confluence/display/KAFKA/Clients
Apache Kafka supports the following use case with many different domains including financial, IOT and more.
- Messaging
- Website Activity Tracking
- Metrics
- Log Aggregation
- Stream Processing
- Event Sourcing
- Commit Log
Apache Kafka uses Zookeeper for managing the Kafka components in the cluster. ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All these kinds of services are used in some form or another by distributed applications. ZooKeeper is a consistent file system for configuration information.
Download the latest Kafka (2.5.0) from the following location
apache.org/dyn/closer.cgi?path=/kafka/2.5.0..
download the latest zookeeper (Ex 3.6.1) from the following location
https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
download the latest released version of JDK 1.8
Install JDK Install the open JDK and set the following variable in windows path
User variables JAVA_HOME= C:\java\openjdk\jdk-8.0.252.09-hotspot\
Append System variables Path variable ( Ex PATH=% JAVA_HOME%\bin;). Run the following command to confirm the java installation.
C:\>java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)
Install Zookeeper
Extract zookeeper and open the zookeeper configuration file
Copy C:\software\apache-zookeeper-3.6.1-bin\conf\ zoo_sample.cfg too zoo.cfg.
Update the following property
Run the following command
C:\software\apache-zookeeper-3.6.1-bin\bin\zkServer.cmd
Make sure zookeeper runs successfully and listen the port 2181.
Install Kafka Extract Kafka and open the configuration files. The default configuration provided with the Kafka distribution is sufficient to run the single node Kafka. broker.id should be unique in the environment. the default stand-alone configuration uses a single broker only. Kafka uses the default listener on TCP port 9092. the log directory should be writable. Kafka persists all messages to disk specified in the log.dirs configuration.
Run the following command
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-server-start.bat ....\config\server.properties Make sure Kafka runs successfully.
Create topics and list topics The Kafka topics created automatically when auto.create.topics.enable configured. It creates by default using the following use cases
When a producer starts writing messages When a consumer starts reading messages When any client requests metadata
# Create the topics
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.
# List the topics
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
test
Produce and consume Messages The producer writes the messages to the topic. The consumer consumes the messages from the topic. Apache Kafka deploys the build-in client for accessing the producer and consumer API. The command-line utility helps to produce the consume the messages without writing any code and helpful for testing the Kafka installations/inspect the Kafka components.
#produce the message
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
#consume the messages
C:\software\kafka_2.12-2.5.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Apache Kafka in Spring Boot Application The command-line tools help to understand the concepts. The enterprise application uses high-level programming language to produce and consume the messages using the API. the following section uses Java and Spring boot application to produce and consume the messages.
Producer Kafka producer gives many flexible options for publishing the messages in different use cases. Example credit card transaction requirement doesn’t allow duplicate messages or lose any messages. The producer creates ProducerRecord and publishes the messages using producer API. The producer and consumer set key.serializer and value.serializer properties. The data serialize based on these properties. The Kafka client API provides a set of serialization options. It supports like Avro, Thrift, or Protobuf or custom serialization. if the system requires multiple versions of schema and the system expects to store the schema. We can use a common architecture pattern and use a Schema Registry like Confluent Schema Registry.
The producer set the acks parameter to control the message write policies. It supports write and does not wait for a reply, write a message to leader and returns, write to all in-sync replicas.
The following code set the properties for producers.
public static Properties getProducerProperties(String kafkaBootstrapServers) {
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", kafkaBootstrapServers);
producerProperties.put("acks", "all");
producerProperties.put("retries", 0);
producerProperties.put("batch.size", 16384);
producerProperties.put("linger.ms", 1);
producerProperties.put("buffer.memory", 33554432);
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return producerProperties;
}
The Kafka manages and writes the data to the appropriate partition based on the record details. If the messages were successfully written to Kafka, it will return a RecordMetadata object includes the topic, partition, and the offset. Otherwise, it will return an error. The producer sends messages Fire-and-forget, Synchronous, and ASynchronous send options.
private void sendMessagesToKafka(String kafkaBootstrapServers) {
KafkaProducer<String, String> producer = new KafkaProducer<>(
SimpleKafkaUtility.getProducerProperties(kafkaBootstrapServers));
for (int index = 0; index < 10; index++) {
JSONObject jsonObject = new JSONObject();
try {
jsonObject.put("index", index);
jsonObject.put("message", "The index is now: " + index);
} catch (JSONException e) {
System.out.println(e.getMessage());
}
producer.send(new ProducerRecord<>(theTechCheckTopicName, "indexMessge",jsonObject.toString()));
System.out.println("Send Message to Kafka:"+ jsonObject.toString());
}
}
Consumer Kafka consumer provides many options to consume messages. The ConsumerRecords return from the poll and iterate the records. The consumer also sets bootstrap.servers, key.serializer, and value.serializer properties. The consumer uses a similar set of properties plus consumer group property. Kafka provides a consumer group which contains the group of consumers.
The following code set properties for consumers.
public static Properties getConsumerProperties(String kafkaBootstrapServers, String zookeeperGroupId) {
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", kafkaBootstrapServers);
consumerProperties.put("group.id", zookeeperGroupId);
consumerProperties.put("zookeeper.session.timeout.ms", "6000");
consumerProperties.put("zookeeper.sync.time.ms","2000");
consumerProperties.put("auto.commit.enable", "false");
consumerProperties.put("auto.commit.interval.ms", "1000");
consumerProperties.put("consumer.timeout.ms", "-1");
consumerProperties.put("max.poll.records", "1");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return consumerProperties;
}
The consumer consumes messages from one or more topics. The consumer supports regular expression for giving a set of topics. For example, If the producer publishes the message to topic T with partitions like P1,P2 and P3. The consumer group may contain a group of consumers. If the consumer group contains three consumers, each partition map with a separate consumer. The more time-consuming consumer can’t possibly keep up with the rate data flows into a topic and adding more consumers helps to increase the scalability. When the consumer group adds new consumers or shutdown due to failure, the consumer group rebalance the consumers. the process of moving partition ownership from one consumer to another is called a rebalance.
// Start consumer thread to read messages
Thread kafkaConsumerThread = new Thread(() -> {
System.out.println("Starting Kafka consumer thread.");
SimpleKafkaConsumer simpleKafkaConsumer = new SimpleKafkaConsumer(theTechCheckTopicName,
SimpleKafkaUtility.getConsumerProperties(kafkaBootstrapServers, zookeeperGroupId));
simpleKafkaConsumer.runSingleWorker();
});
kafkaConsumerThread.start();
public class SimpleKafkaConsumer {
private KafkaConsumer<String, String> kafkaConsumer;
public SimpleKafkaConsumer(String theTechCheckTopicName, Properties consumerProperties) {
kafkaConsumer = new KafkaConsumer<>(consumerProperties);
kafkaConsumer.subscribe(Collections.singletonList(theTechCheckTopicName));
}
public void runSingleWorker() {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
System.out.println("Received message: " + message);
// Commit the offset
Map<TopicPartition, OffsetAndMetadata> commitMessage = new HashMap<>();
commitMessage.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
kafkaConsumer.commitSync(commitMessage);
System.out.println("Offset committed to Kafka.");
}
}
}
}
Reference
docs.confluent.io/current/index.html