Skip to content
Apache Kafka kf consumers 4 min read

Consumer Overview

A Kafka consumer reads records from one or more topic partitions and tracks how far it has progressed using offsets. Unlike a traditional message queue that pushes messages and deletes them on delivery, Kafka is a pull-based, durable log: the broker keeps records around for a retention period, and each consumer is responsible for pulling batches and remembering its own position. Understanding this model — the subscribe, poll, process, commit cycle — is the foundation for everything else you do on the consumer side, from scaling with consumer groups to recovering from failures without losing or double-processing data.

The consumer model

A consumer’s job follows a simple, repeated rhythm:

  1. Subscribe to a set of topics (or assign specific partitions manually).
  2. Poll the broker for a batch of records.
  3. Process the records your application received.
  4. Commit offsets so that on restart you resume from the right place.

The consumer maintains a long-lived TCP connection to the cluster, joins a consumer group, and is assigned a subset of partitions. Offsets are stored in an internal Kafka topic called __consumer_offsets, keyed by group, topic, and partition. Because the position is server-side state tied to the group, a restarted or replaced consumer can pick up exactly where the group left off.

The poll loop

poll(Duration) is the heart of the consumer. A single call does far more than fetch records: it sends and receives heartbeats, triggers any pending group rebalances, fetches data from the partitions assigned to this instance, and returns whatever records are buffered. You must call poll() regularly — if you stop calling it for longer than max.poll.interval.ms, the broker assumes your consumer is dead and reassigns its partitions to other members.

Important: A KafkaConsumer instance is not thread-safe. All calls must happen on the single thread that owns it. The only method safe to call from another thread is wakeup(), which is used to break out of a blocking poll() for a clean shutdown.

This “one thread per consumer” rule is deliberate. If you need more parallelism, run more consumer instances in the same group rather than sharing one consumer across threads. The poll loop’s duration argument is the maximum time to block waiting for data; it returns sooner if records arrive.

Fetching behavior

Records are fetched in batches per partition. Three configuration keys shape how much data each poll() returns and how long it waits:

PropertyDefaultMeaning
fetch.min.bytes1Broker waits until this many bytes are available before responding.
fetch.max.wait.ms500Max time the broker waits to satisfy fetch.min.bytes.
max.poll.records500Upper bound on records returned by a single poll() call.

Tuning these trades latency for throughput: raising fetch.min.bytes reduces request overhead at the cost of slightly higher latency, while lowering max.poll.records keeps each processing batch small so you never exceed max.poll.interval.ms.

A complete consumer example

The following plain-client consumer subscribes to a topic, polls in a loop, processes each record, and commits offsets synchronously after each batch. It uses wakeup() from a shutdown hook for a graceful exit.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class OrderConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "order-processors");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        Thread main = Thread.currentThread();
        Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));

        try {
            consumer.subscribe(List.of("orders"));
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("partition=%d offset=%d key=%s value=%s%n",
                            record.partition(), record.offset(),
                            record.key(), record.value());
                }
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        } catch (WakeupException e) {
            // expected on shutdown — ignore
        } finally {
            consumer.close();
            System.out.println("Consumer closed cleanly.");
        }
    }
}

Output:

partition=0 offset=12 key=ord-1001 value={"item":"keyboard","qty":2}
partition=0 offset=13 key=ord-1002 value={"item":"mouse","qty":1}
partition=2 offset=7  key=ord-1003 value={"item":"monitor","qty":1}
Consumer closed cleanly.

Notice the order of operations: process first, then commit. Committing after successful processing gives you at-least-once delivery — if the application crashes between processing and committing, those records are simply re-delivered on restart rather than lost.

Lifecycle and clean shutdown

A consumer transitions through a predictable lifecycle: it joins the group, participates in partition assignment, runs its poll loop, and finally leaves the group on close(). Calling close() is important — it commits any pending offsets (when auto-commit is on), sends a LeaveGroup request, and lets the group rebalance immediately instead of waiting for the session timeout to expire. The wakeup() pattern above ensures close() always runs even when the JVM is shutting down.

Best Practices

  • Keep one KafkaConsumer per thread; scale out with more group members, never by sharing an instance.
  • Always call poll() frequently enough to stay under max.poll.interval.ms; offload slow work or reduce max.poll.records if processing is heavy.
  • Process records before committing to guarantee at-least-once semantics and avoid silent data loss.
  • Use wakeup() from a shutdown hook and always close() the consumer for fast, clean rebalances.
  • Disable enable.auto.commit when you need precise control over when offsets advance.
  • Tune fetch.min.bytes and fetch.max.wait.ms to balance throughput against latency for your workload.
Last updated June 1, 2026
Was this helpful?