Consumer Configuration (Spring)
Every @KafkaListener in a Spring Boot application is backed by a listener container, and that container is produced by a ConcurrentKafkaListenerContainerFactory. Getting this factory right is the single most impactful tuning decision for a consumer service: it governs how many threads poll the broker, how offsets are committed, how long polls block, and how raw bytes become typed objects. This page shows how to configure the consumer side end to end — both with plain application.yml and with explicit @Bean definitions when you need finer control.
How the container factory backs @KafkaListener
When Spring Kafka boots, it auto-configures a ConsumerFactory<Object, Object> and a ConcurrentKafkaListenerContainerFactory<Object, Object> from your properties. Each method annotated with @KafkaListener is wrapped in a MessageListenerContainer created by that factory. The factory holds the ConsumerFactory (which produces the underlying org.apache.kafka.clients.consumer.KafkaConsumer) and a ContainerProperties object that controls polling, acknowledgment, and threading.
@KafkaListener -> ConcurrentKafkaListenerContainerFactory
|-- ConsumerFactory -> KafkaConsumer (one per concurrency thread)
|-- ContainerProperties (ackMode, pollTimeout, ...)
If you only need one configuration, the auto-configured beans are enough and you tune them entirely through properties. Define your own beans only when you need multiple factories (for example, one JSON factory and one batch factory) or programmatic logic.
Configuring via application.yml
The fastest path is spring.kafka.consumer.*. These properties feed the auto-configured ConsumerFactory, while spring.kafka.listener.* feeds the container.
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: orders-service
auto-offset-reset: earliest
max-poll-records: 500
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.devcraftly.orders.events"
spring.json.value.default.type: "com.devcraftly.orders.events.OrderPlaced"
listener:
concurrency: 3
ack-mode: manual_immediate
poll-timeout: 3000
Tip: Set
enable-auto-commit: false(the default in Spring Kafka) and let the container manage commits. Auto-commit can acknowledge records before your listener finishes processing them, silently dropping messages on a crash.
Key consumer properties
| Property | Purpose | Typical value |
|---|---|---|
group-id | Consumer group; partitions are shared across members of the same group | service name |
auto-offset-reset | Where to start when no committed offset exists | earliest or latest |
max-poll-records | Max records returned per poll() | 500 |
enable-auto-commit | Let the client auto-commit offsets | false |
concurrency | Listener threads per container (≤ partitions) | 3 |
ack-mode | When/how offsets are committed | BATCH, RECORD, MANUAL_IMMEDIATE |
poll-timeout | How long poll() blocks waiting for records (ms) | 3000 |
Defining the factory as a @Bean
For explicit control — multiple deserializers, error handlers, or per-factory tuning — declare the beans yourself. Constructor-inject KafkaProperties to reuse the resolved bootstrap-servers and other shared settings.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
private final KafkaProperties properties;
public KafkaConsumerConfig(KafkaProperties properties) {
this.properties = properties;
}
@Bean
public ConsumerFactory<String, OrderPlaced> orderConsumerFactory() {
Map<String, Object> props = new HashMap<>(
properties.buildConsumerProperties(null));
props.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-service");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
JsonDeserializer<OrderPlaced> valueDeserializer =
new JsonDeserializer<>(OrderPlaced.class);
valueDeserializer.addTrustedPackages("com.devcraftly.orders.events");
return new DefaultKafkaConsumerFactory<>(
props, new StringDeserializer(), valueDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlaced> orderKafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderPlaced>();
factory.setConsumerFactory(orderConsumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
The DTO is a plain Java record:
package com.devcraftly.orders.events;
public record OrderPlaced(String orderId, String customerId, long amountCents) {}
A listener references a named factory via the containerFactory attribute. Omit it to use the auto-configured default.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
@KafkaListener(
topics = "orders",
groupId = "orders-service",
containerFactory = "orderKafkaListenerContainerFactory")
public void onOrder(OrderPlaced order, Acknowledgment ack) {
// process the order, then commit the offset
System.out.println("Received order " + order.orderId());
ack.acknowledge();
}
}
Concurrency and ack mode
concurrency is the number of consumer threads (and thus KafkaConsumer instances) the container creates. Kafka assigns partitions to threads, so concurrency above the partition count leaves threads idle — set it to a value that divides evenly into your partition count for balanced load.
AckMode decides when committed offsets advance:
| AckMode | Behavior |
|---|---|
BATCH (default) | Commit offsets for the whole poll batch after the listener returns |
RECORD | Commit after each record is processed |
MANUAL | You call ack.acknowledge(); committed on the next poll |
MANUAL_IMMEDIATE | You call ack.acknowledge(); commits immediately |
TIME / COUNT / COUNT_TIME | Commit on an interval, a record count, or whichever comes first |
Warning: Manual ack modes require an
Acknowledgmentparameter in the listener. WithMANUALorMANUAL_IMMEDIATE, forgetting to callacknowledge()means offsets never advance and records are redelivered after a rebalance or restart.
Best Practices
- Keep
enable-auto-commitoff and choose an explicitAckModeso commits reflect successful processing, not just delivery. - Match
concurrencyto a divisor of the topic’s partition count; over-provisioning wastes threads, under-provisioning limits throughput. - Always set
auto-offset-resetdeliberately —earliestfor replayable pipelines,latestfor live-only consumers. - Restrict
spring.json.trusted.packagesto your event packages rather than*to avoid deserialization of untrusted types. - Tune
max-poll-recordsandpoll-timeouttogether so a single batch finishes well withinmax.poll.interval.ms, preventing rebalances. - Reuse
KafkaProperties.buildConsumerProperties(null)in custom factories so shared settings like security and bootstrap servers stay in one place.