Skip to content
Apache Kafka kf spring 5 min read

Request-Reply Messaging

Kafka is built for fire-and-forget, one-way streaming, but plenty of real systems need a response to a message: validate a payment, look up a price, or score a transaction and wait for the answer. Spring for Apache Kafka offers ReplyingKafkaTemplate, which sends a record to a request topic and returns a Future that completes when the matching reply lands on a separate reply topic. It is a clean abstraction, but it stitches together two asynchronous flows over a broker, so it carries trade-offs you must understand before reaching for it in production.

How request-reply works over Kafka

There is no native concept of a reply in Kafka. Spring layers one on using two ingredients: a correlation ID and a reply topic. When you send a request, ReplyingKafkaTemplate generates a unique correlation ID and writes it into the KafkaHeaders.CORRELATION_ID header along with a KafkaHeaders.REPLY_TOPIC header telling the server where to send the answer. The server processes the request and publishes a response carrying the same correlation ID. A dedicated listener container on the client side consumes the reply topic, matches the correlation ID to the pending Future, and completes it.

Client                          Broker                         Server
  |  send(request)                 |                              |
  |  CORRELATION_ID=abc123 ------> [requests topic] -----------> @KafkaListener
  |  REPLY_TOPIC=replies           |                              |  @SendTo
  |                                |                              |  process + reply
  |  <----- reply container <----- [replies topic] <------------ CORRELATION_ID=abc123
  |  Future completes              |                              |

Client side: the ReplyingKafkaTemplate

ReplyingKafkaTemplate wraps a normal producer factory plus a listener container that owns the reply topic. The container must be subscribed to the reply topic and run independently so it can correlate responses as they arrive.

@Configuration
public class ReplyConfig {

    @Bean
    public ConcurrentMessageListenerContainer<String, PriceResponse> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, PriceResponse> factory) {
        ConcurrentMessageListenerContainer<String, PriceResponse> container =
                factory.createContainer("price-replies");
        container.getContainerProperties().setGroupId("price-client");
        container.setAutoStartup(false); // started by the template
        return container;
    }

    @Bean
    public ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> replyingKafkaTemplate(
            ProducerFactory<String, PriceRequest> pf,
            ConcurrentMessageListenerContainer<String, PriceResponse> repliesContainer) {
        ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template =
                new ReplyingKafkaTemplate<>(pf, repliesContainer);
        template.setDefaultReplyTimeout(Duration.ofSeconds(5));
        template.setSharedReplyTopic(true);
        return template;
    }
}

The DTOs are plain records serialized as JSON (see the JSON serialization page for the producer/consumer factory setup):

public record PriceRequest(String sku, int quantity) {}
public record PriceResponse(String sku, BigDecimal totalPrice) {}

Sending a request and blocking for the answer:

@Service
public class PriceClient {

    private final ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template;

    public PriceClient(ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template) {
        this.template = template;
    }

    public PriceResponse quote(String sku, int quantity)
            throws ExecutionException, InterruptedException, TimeoutException {
        ProducerRecord<String, PriceRequest> record =
                new ProducerRecord<>("price-requests", new PriceRequest(sku, quantity));
        record.headers().add(new RecordHeader(
                KafkaHeaders.REPLY_TOPIC, "price-replies".getBytes(StandardCharsets.UTF_8)));

        RequestReplyFuture<String, PriceRequest, PriceResponse> future =
                template.sendAndReceive(record);

        ConsumerRecord<String, PriceResponse> reply = future.get(5, TimeUnit.SECONDS);
        return reply.value();
    }
}

sendAndReceive returns a RequestReplyFuture. Calling get() with a timeout blocks the caller until the reply arrives or the timeout fires — that is what gives the synchronous feel.

Server side: @KafkaListener with @SendTo

On the responding service, the listener returns a value and @SendTo routes it back. When the request carries a REPLY_TOPIC header, Spring sends the response there and copies the correlation ID automatically — you do not manage it by hand.

@Component
public class PriceServer {

    @KafkaListener(topics = "price-requests", groupId = "price-service")
    @SendTo // honors the REPLY_TOPIC header from the request
    public PriceResponse handle(PriceRequest request) {
        BigDecimal unit = lookupUnitPrice(request.sku());
        BigDecimal total = unit.multiply(BigDecimal.valueOf(request.quantity()));
        return new PriceResponse(request.sku(), total);
    }

    private BigDecimal lookupUnitPrice(String sku) {
        return new BigDecimal("19.99");
    }
}

Output:

PriceClient.quote("SKU-42", 3) -> PriceResponse[sku=SKU-42, totalPrice=59.97]

If you specify a literal topic in @SendTo("price-replies"), it is used only as a fallback. The REPLY_TOPIC header from the request always wins, which lets different clients use their own reply topics against one server.

Correlation and topic strategies

Two reply-topic strategies cover most cases.

StrategyHow it worksWhen to use
Shared reply topicAll client instances share one reply topic; setSharedReplyTopic(true) filters by correlation IDFew instances, simple topology
Per-instance reply topicEach instance owns a unique reply topic (e.g. suffixed with the hostname)Many instances; avoids cross-instance traffic

With a shared topic, every instance consumes every reply but discards those whose correlation ID it did not issue. With many instances this wastes broker traffic, so prefer a unique reply topic per instance for horizontal scale. The reply listener container should use a unique group.id (or no committed offsets) so each instance reads all replies on its topic.

Caveats: this is not truly synchronous

The blocking get() hides a fully asynchronous round trip. Keep these realities in mind:

  • A timed-out request does not cancel server work. The server may still process and reply after your Future has already failed; that late reply is simply unmatched and dropped.
  • Reply topics need partitions and retention like any topic. Short retention is fine since replies are consumed immediately, but they must exist before the template starts.
  • Throughput is bounded by request latency because callers block a thread per call. Under load this can exhaust thread pools — size them deliberately.
  • Ordering and once-only delivery still follow Kafka semantics; a redelivered request can produce a duplicate reply with the same correlation ID.

Request-reply is a convenience over an event log, not an RPC framework. If you need strict, low-latency synchronous calls, a direct REST/gRPC call is usually the better tool. Reach for Kafka request-reply when you want a reply and the durability, replay, and decoupling of the log.

Best practices

  • Always set a sane defaultReplyTimeout and pass an explicit timeout to future.get(...); never block unbounded.
  • Make request handlers idempotent so redelivered requests do not corrupt state or send conflicting replies.
  • Use a unique reply topic per client instance for scale-out; reserve the shared reply topic for small deployments.
  • Pre-create reply topics with appropriate partitions and short retention rather than relying on auto-creation.
  • Handle TimeoutException explicitly and surface a meaningful error to the caller instead of hanging the request thread.
  • Prefer JSON records with type-safe deserialization for requests and responses, and validate inputs on the server side.
  • For high-throughput, latency-sensitive synchronous needs, evaluate REST/gRPC instead of forcing request-reply onto Kafka.
Last updated June 1, 2026
Was this helpful?