In this section, we will delve into the process of consuming messages from Kafka topics. Consuming messages is a fundamental operation in Kafka, allowing applications to read and process data streams. We will cover the following key concepts:
- Introduction to Consumers
- Consumer Groups
- Consumer Configuration
- Consuming Messages with Java
- Practical Exercises
- Introduction to Consumers
A Kafka consumer is an application that reads records from Kafka topics. Consumers subscribe to one or more topics and process the records in the order they are stored in the partitions.
Key Concepts:
- Consumer: An application that reads data from Kafka topics.
- Subscription: The process of a consumer registering interest in one or more topics.
- Polling: The method by which a consumer fetches data from Kafka.
- Consumer Groups
Consumers in Kafka are organized into consumer groups. Each consumer in a group reads data from a subset of the partitions in the topic(s) they subscribe to. This allows for parallel processing and load balancing.
Key Concepts:
- Consumer Group: A group of consumers that work together to consume data from a set of topics.
- Partition Assignment: The process of assigning partitions to consumers within a group.
- Rebalancing: The process of redistributing partitions among consumers when the group membership changes.
Example:
- If a topic has 4 partitions and there are 2 consumers in a group, each consumer will read from 2 partitions.
- Consumer Configuration
Configuring a Kafka consumer involves setting various properties that control its behavior. Some important configurations include:
Configuration Property | Description |
---|---|
bootstrap.servers |
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
group.id |
A unique string that identifies the consumer group this consumer belongs to. |
key.deserializer |
The class name of the deserializer for the key. |
value.deserializer |
The class name of the deserializer for the value. |
auto.offset.reset |
What to do when there is no initial offset in Kafka or if the current offset does not exist. Options are earliest , latest , and none . |
- Consuming Messages with Java
Let's look at a practical example of consuming messages using the Kafka Java client.
Example Code:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
Explanation:
- Properties: Configures the consumer with necessary properties.
- KafkaConsumer: Creates a new consumer instance.
- subscribe: Subscribes to the specified topic.
- poll: Fetches data from the topic.
- ConsumerRecords: Represents the records fetched from Kafka.
- ConsumerRecord: Represents an individual record.
- Practical Exercises
Exercise 1: Basic Consumer
Write a Kafka consumer that subscribes to a topic named my-topic
and prints the messages to the console.
Solution:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
Exercise 2: Handling Rebalancing
Modify the consumer to handle rebalancing events by implementing a ConsumerRebalanceListener
.
Solution:
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Properties; public class RebalanceConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "rebalance-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("rebalance-topic"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Partitions revoked: " + partitions); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("Partitions assigned: " + partitions); } }); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
Common Mistakes:
- Not closing the consumer: Always close the consumer to release resources.
- Incorrect deserializer configuration: Ensure the deserializer matches the data type of the messages.
- Ignoring rebalancing: Handle rebalancing to ensure smooth operation of the consumer group.
Conclusion
In this section, we covered the basics of consuming messages in Kafka, including consumer groups, configuration, and practical examples using Java. Understanding these concepts is crucial for building robust Kafka consumers that can efficiently process data streams. In the next section, we will explore Kafka Connect, a tool for integrating Kafka with other data systems.
Kafka Course
Module 1: Introduction to Kafka
Module 2: Kafka Core Concepts
Module 3: Kafka Operations
Module 4: Kafka Configuration and Management
Module 5: Advanced Kafka Topics
- Kafka Performance Tuning
- Kafka in a Multi-Data Center Setup
- Kafka with Schema Registry
- Kafka Streams Advanced