Consuming with @KafkaListener
The @KafkaListener annotation is the cornerstone of message consumption in Spring for Apache Kafka. You annotate a method on a Spring-managed bean, and the framework wires up a polling loop, deserialization, threading, and offset management behind the scenes — letting you focus on business logic instead of consumer plumbing. In production this is where the bulk of your event-processing code lives, so understanding how topics, consumer groups, and payload binding work together is essential to building reliable, horizontally scalable consumers.
How @KafkaListener works
When Spring detects @KafkaListener on a bean method, it registers a MessageListenerContainer for that method. The container owns a Kafka consumer that polls the broker, deserializes each record, and invokes your method once per message (or once per batch). The annotated bean must be a Spring component so the container factory can find it during startup.
At minimum you specify the topic(s) and a consumer group. The groupId determines how partitions are shared: every consumer instance using the same group splits the topic’s partitions between them, while consumers in different groups each receive a full copy of the stream.
@Component
public class OrderListener {
@KafkaListener(topics = "orders", groupId = "order-processing")
public void onMessage(OrderEvent event) {
// Spring deserializes the value into OrderEvent automatically
System.out.println("Received order: " + event.orderId());
}
}
Payload and header binding
Spring binds the deserialized record value to the method parameter automatically. For finer control — or when you need Kafka metadata such as the topic, partition, offset, or message key — annotate parameters with @Payload and @Header. The constants in KafkaHeaders cover every standard piece of metadata.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
@KafkaListener(topics = "orders", groupId = "order-processing")
public void onMessage(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.printf(
"key=%s partition=%d offset=%d order=%s amount=%.2f%n",
key, partition, offset, event.orderId(), event.amount());
}
}
Assuming the OrderEvent record produced earlier:
public record OrderEvent(String orderId, String customerId, double amount) {}
Output:
key=ORD-1001 partition=0 offset=42 order=ORD-1001 amount=149.99
key=ORD-1002 partition=1 offset=17 order=ORD-1002 amount=89.50
Use
KafkaHeaders.RECEIVED_KEY(notKafkaHeaders.KEY) on the consumer side. TheRECEIVED_*constants represent metadata read from an inbound record; the non-prefixed ones are for outbound messages.
Receiving the full ConsumerRecord
When you need everything about a record in one object — including timestamps and the raw header set — declare a parameter of type ConsumerRecord<K, V>. Spring passes the native record straight through, giving you direct access to the underlying Kafka client API.
import org.apache.kafka.clients.consumer.ConsumerRecord;
@Component
public class OrderAuditListener {
@KafkaListener(topics = "orders", groupId = "order-audit")
public void audit(ConsumerRecord<String, OrderEvent> record) {
System.out.printf("topic=%s ts=%d key=%s value=%s%n",
record.topic(), record.timestamp(), record.key(), record.value());
}
}
Listening to multiple topics
A single listener can subscribe to several topics by passing an array, or you can match a family of topics with topicPattern (a regex evaluated against broker metadata at refresh time). Pattern matching is handy for multi-tenant or sharded topic naming schemes.
@KafkaListener(topics = {"orders", "orders-priority"}, groupId = "order-processing")
public void onMessage(OrderEvent event) { /* ... */ }
@KafkaListener(topicPattern = "orders\\..*", groupId = "order-processing")
public void onTenantMessage(OrderEvent event) { /* ... */ }
id and containerFactory
Two attributes give you operational control over the listener. id names the container so you can pause, resume, or inspect it via the KafkaListenerEndpointRegistry. containerFactory selects a non-default ConcurrentKafkaListenerContainerFactory bean — useful when one application consumes value types that need different deserializers or concurrency settings.
| Attribute | Purpose | Default |
|---|---|---|
topics | Explicit topic names to subscribe to | none (required unless topicPattern) |
topicPattern | Regex matched against existing topics | none |
groupId | Consumer group; overrides factory-level config | factory group.id |
id | Unique container id for runtime lookup/control | generated |
containerFactory | Bean name of the container factory to use | kafkaListenerContainerFactory |
concurrency | Number of consumer threads (partitions permitting) | factory concurrency |
@KafkaListener(
id = "order-processor",
topics = "orders",
groupId = "order-processing",
containerFactory = "orderListenerContainerFactory",
concurrency = "3")
public void onMessage(OrderEvent event) { /* ... */ }
Setting
concurrencyhigher than the topic’s partition count wastes threads — extra consumers in the group sit idle because partitions are the unit of parallelism. Size concurrency to your partitions, not the other way around.
Best Practices
- Always set an explicit
groupIdper logical consumer so rebalancing and offset tracking behave predictably across deployments and scaling events. - Prefer typed payloads (records) plus
@Headerparameters over parsing the rawConsumerRecordunless you genuinely need broker metadata or the native record. - Keep listener methods fast and non-blocking; offload slow I/O so you do not stall the poll loop and trigger a rebalance from
max.poll.interval.msexpiry. - Give long-lived or operationally important listeners a stable
idso you can pause/resume them through theKafkaListenerEndpointRegistry. - Use a dedicated
containerFactorywhen different listeners need different deserializers, error handlers, or concurrency rather than overloading one factory. - Match
concurrencyto partition count, and remember that throwing from a listener triggers the configured error handler — design for retries and dead-letter routing explicitly.