Producing with KafkaTemplate
KafkaTemplate is the central component for publishing records from a Spring Boot application. It wraps a thread-safe producer, applies the serializers and producer properties you configured, and exposes a small but expressive API for sending records to a topic. Because the underlying producer batches and sends asynchronously, understanding the difference between fire-and-forget, asynchronous result handling, and blocking sends is essential for building reliable, high-throughput producers in production.
Obtaining a KafkaTemplate
When you add spring-kafka and define producer properties, Spring Boot auto-configures a ProducerFactory and a KafkaTemplate<Object, Object> bean. You inject it directly via constructor injection. For type safety, declare the template with the key and value types your application uses.
package com.devcraftly.kafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
Use a Java record for the payload so it is immutable and concise. Spring’s JSON serializer maps it to a message body automatically.
package com.devcraftly.kafka;
import java.math.BigDecimal;
import java.time.Instant;
public record OrderEvent(
String orderId,
String customerId,
BigDecimal amount,
Instant occurredAt) {
}
Sending with a key
The most common call is send(topic, key, value). The key determines the partition (records with the same key land on the same partition, preserving per-key ordering), so choosing a meaningful key such as the order or customer ID is a deliberate design decision.
public void publish(OrderEvent event) {
kafkaTemplate.send("orders", event.orderId(), event);
}
Every send overload returns a CompletableFuture<SendResult<K, V>>. If you ignore it, the call is effectively fire-and-forget — fast, but you will not observe broker failures. That is acceptable only for non-critical telemetry.
Handling the send result asynchronously
For events that matter, attach a whenComplete callback to react to success or failure without blocking the calling thread. The SendResult exposes the RecordMetadata (assigned partition and offset), which is invaluable for logging and tracing.
import org.springframework.kafka.support.SendResult;
public void publishTracked(OrderEvent event) {
kafkaTemplate.send("orders", event.orderId(), event)
.whenComplete((SendResult<String, OrderEvent> result, Throwable ex) -> {
if (ex == null) {
var md = result.getRecordMetadata();
System.out.printf("Sent %s to %s-%d@%d%n",
event.orderId(), md.topic(), md.partition(), md.offset());
} else {
System.err.printf("Failed to send %s: %s%n",
event.orderId(), ex.getMessage());
}
});
}
Output:
Sent ORD-1042 to orders-2@517
The
whenCompletecallback runs on the producer’s I/O thread, not your caller thread. Keep it short and never perform blocking work there — offload heavy handling to another executor.
Targeting a specific partition
Sometimes you need to bypass key-based partitioning and write to an explicit partition. The full overload accepts a partition and an optional timestamp.
int partition = 0;
Long timestamp = System.currentTimeMillis();
kafkaTemplate.send("orders", partition, timestamp, event.orderId(), event);
Use this sparingly. Hard-coding partitions couples your producer to the topic’s partition count and can create skew if the count changes.
Sending with headers using ProducerRecord
When you need metadata that should travel with the message — a schema version, correlation ID, or event type — build a ProducerRecord and add RecordHeader entries. Spring forwards the record unchanged.
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.nio.charset.StandardCharsets;
public void publishWithHeaders(OrderEvent event) {
ProducerRecord<String, OrderEvent> record =
new ProducerRecord<>("orders", event.orderId(), event);
record.headers()
.add(new RecordHeader("event-type", "order.created".getBytes(StandardCharsets.UTF_8)))
.add(new RecordHeader("schema-version", "1".getBytes(StandardCharsets.UTF_8)));
kafkaTemplate.send(record);
}
Header values are raw byte[], so encode strings explicitly. Consumers read them back through the Headers API or @Header parameters in a listener.
Synchronous (blocking) send
When you must guarantee the record was acknowledged before continuing — for example, inside a request handler that returns the assigned offset — block on the future with get(). This trades throughput for certainty.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public long publishSync(OrderEvent event) {
try {
SendResult<String, OrderEvent> result =
kafkaTemplate.send("orders", event.orderId(), event)
.get(10, TimeUnit.SECONDS);
return result.getRecordMetadata().offset();
} catch (ExecutionException e) {
throw new IllegalStateException("Kafka send failed", e.getCause());
} catch (TimeoutException e) {
throw new IllegalStateException("Kafka send timed out", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while sending", e);
}
}
Always pass a timeout to
get(). An unboundedget()can hang a request thread indefinitely if the broker is unreachable anddelivery.timeout.msis large.
Send modes compared
| Mode | Call | Blocks caller | Observes failures | Use when |
|---|---|---|---|---|
| Fire-and-forget | send(...) (ignore future) | No | No | Lossy telemetry, metrics |
| Async with callback | send(...).whenComplete(...) | No | Yes | High-throughput domain events |
| Synchronous | send(...).get(timeout) | Yes | Yes | Must confirm before responding |
Complete @Service example
package com.devcraftly.kafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
@Service
public class OrderEventService {
private static final String TOPIC = "orders";
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public OrderEventService(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void emit(OrderEvent event) {
kafkaTemplate.send(TOPIC, event.orderId(), event)
.whenComplete((SendResult<String, OrderEvent> result, Throwable ex) -> {
if (ex != null) {
// Hand off to a retry/dead-letter strategy rather than swallowing.
throw new KafkaPublishException(event.orderId(), ex);
}
});
}
static final class KafkaPublishException extends RuntimeException {
KafkaPublishException(String orderId, Throwable cause) {
super("Failed to publish event for order " + orderId, cause);
}
}
}
Best Practices
- Prefer the asynchronous
whenCompleteform for domain events; reserve blockingget()for cases where the caller genuinely needs the offset before responding. - Always supply a timeout to
get()so a slow or unreachable broker cannot stall request threads. - Choose a stable, meaningful message key to control partitioning and preserve per-key ordering instead of pinning explicit partition numbers.
- Set
acks=alland enable idempotence in producer config for events that must not be lost or duplicated. - Keep
whenCompletecallbacks non-blocking; they execute on the producer I/O thread. - Encode header values explicitly with a known charset and document each header your messages carry.
- Reuse the auto-configured
KafkaTemplatebean — it is thread-safe and meant to be shared across the application.