Consumer Overview
A Kafka consumer reads records from one or more topic partitions and tracks how far it has progressed using offsets. Unlike a traditional message queue that pushes messages and deletes them on delivery, Kafka is a pull-based, durable log: the broker keeps records around for a retention period, and each consumer is responsible for pulling batches and remembering its own position. Understanding this model — the subscribe, poll, process, commit cycle — is the foundation for everything else you do on the consumer side, from scaling with consumer groups to recovering from failures without losing or double-processing data.
The consumer model
A consumer’s job follows a simple, repeated rhythm:
- Subscribe to a set of topics (or assign specific partitions manually).
- Poll the broker for a batch of records.
- Process the records your application received.
- Commit offsets so that on restart you resume from the right place.
The consumer maintains a long-lived TCP connection to the cluster, joins a consumer group, and is assigned a subset of partitions. Offsets are stored in an internal Kafka topic called __consumer_offsets, keyed by group, topic, and partition. Because the position is server-side state tied to the group, a restarted or replaced consumer can pick up exactly where the group left off.
The poll loop
poll(Duration) is the heart of the consumer. A single call does far more than fetch records: it sends and receives heartbeats, triggers any pending group rebalances, fetches data from the partitions assigned to this instance, and returns whatever records are buffered. You must call poll() regularly — if you stop calling it for longer than max.poll.interval.ms, the broker assumes your consumer is dead and reassigns its partitions to other members.
Important: A
KafkaConsumerinstance is not thread-safe. All calls must happen on the single thread that owns it. The only method safe to call from another thread iswakeup(), which is used to break out of a blockingpoll()for a clean shutdown.
This “one thread per consumer” rule is deliberate. If you need more parallelism, run more consumer instances in the same group rather than sharing one consumer across threads. The poll loop’s duration argument is the maximum time to block waiting for data; it returns sooner if records arrive.
Fetching behavior
Records are fetched in batches per partition. Three configuration keys shape how much data each poll() returns and how long it waits:
| Property | Default | Meaning |
|---|---|---|
fetch.min.bytes | 1 | Broker waits until this many bytes are available before responding. |
fetch.max.wait.ms | 500 | Max time the broker waits to satisfy fetch.min.bytes. |
max.poll.records | 500 | Upper bound on records returned by a single poll() call. |
Tuning these trades latency for throughput: raising fetch.min.bytes reduces request overhead at the cost of slightly higher latency, while lowering max.poll.records keeps each processing batch small so you never exceed max.poll.interval.ms.
A complete consumer example
The following plain-client consumer subscribes to a topic, polls in a loop, processes each record, and commits offsets synchronously after each batch. It uses wakeup() from a shutdown hook for a graceful exit.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class OrderConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processors");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Thread main = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));
try {
consumer.subscribe(List.of("orders"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition=%d offset=%d key=%s value=%s%n",
record.partition(), record.offset(),
record.key(), record.value());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (WakeupException e) {
// expected on shutdown — ignore
} finally {
consumer.close();
System.out.println("Consumer closed cleanly.");
}
}
}
Output:
partition=0 offset=12 key=ord-1001 value={"item":"keyboard","qty":2}
partition=0 offset=13 key=ord-1002 value={"item":"mouse","qty":1}
partition=2 offset=7 key=ord-1003 value={"item":"monitor","qty":1}
Consumer closed cleanly.
Notice the order of operations: process first, then commit. Committing after successful processing gives you at-least-once delivery — if the application crashes between processing and committing, those records are simply re-delivered on restart rather than lost.
Lifecycle and clean shutdown
A consumer transitions through a predictable lifecycle: it joins the group, participates in partition assignment, runs its poll loop, and finally leaves the group on close(). Calling close() is important — it commits any pending offsets (when auto-commit is on), sends a LeaveGroup request, and lets the group rebalance immediately instead of waiting for the session timeout to expire. The wakeup() pattern above ensures close() always runs even when the JVM is shutting down.
Best Practices
- Keep one
KafkaConsumerper thread; scale out with more group members, never by sharing an instance. - Always call
poll()frequently enough to stay undermax.poll.interval.ms; offload slow work or reducemax.poll.recordsif processing is heavy. - Process records before committing to guarantee at-least-once semantics and avoid silent data loss.
- Use
wakeup()from a shutdown hook and alwaysclose()the consumer for fast, clean rebalances. - Disable
enable.auto.commitwhen you need precise control over when offsets advance. - Tune
fetch.min.bytesandfetch.max.wait.msto balance throughput against latency for your workload.