Skip to content
Apache Kafka kf spring 4 min read

Kafka Streams in Spring

Kafka Streams is the stream-processing library bundled with Kafka for building stateful, fault-tolerant applications that read, transform, aggregate, and write topics — all as a normal JVM application with no separate cluster to operate. Spring for Apache Kafka makes this almost effortless: with a single annotation it wires up the streams configuration, manages the KafkaStreams lifecycle alongside the Spring context, and lets you declare your topology as ordinary beans. This page shows how to enable Kafka Streams, configure it, define a topology, and control its lifecycle in production.

Enabling Kafka Streams

Add the kafka-streams dependency alongside spring-kafka, then annotate a @Configuration class with @EnableKafkaStreams. This triggers Spring’s auto-configuration to create a StreamsBuilderFactoryBean (which owns the singleton KafkaStreams instance) and a StreamsBuilder bean you can inject when wiring topologies.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

The streams machinery is driven by a single bean of type KafkaStreamsConfiguration registered under the well-known name defaultKafkaStreamsConfig. Spring Boot creates this for you from spring.kafka.streams.* properties, but defining it explicitly gives you full control.

package com.devcraftly.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

The application.id is the most important setting. It names the consumer group, prefixes internal changelog and repartition topics, and identifies state stores. Each distinct streams application must have a unique, stable application.id — changing it strands existing state and offsets.

If you prefer external configuration, the same values map cleanly to application.yml and Spring Boot builds the bean automatically.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    streams:
      application-id: word-count-app
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

Defining a topology

A topology is the directed graph of processing steps. With Spring you express it by declaring a @Bean method that takes the injected StreamsBuilder and returns a KStream. Spring detects these beans and starts the stream once the context is ready. The classic word-count example reads lines from one topic, splits them into words, counts each word, and writes the running totals to an output topic.

package com.devcraftly.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.Locale;

@Configuration
public class WordCountTopology {

    @Bean
    KStream<String, String> wordCountStream(StreamsBuilder builder) {
        KStream<String, String> source =
                builder.stream("lines", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<String, Long> counts = source
                .flatMapValues(line -> Arrays.asList(line.toLowerCase(Locale.ROOT).split("\\W+")))
                .filter((key, word) -> !word.isBlank())
                .map((key, word) -> new KeyValue<>(word, word))
                .groupByKey(org.apache.kafka.streams.kstream.Grouped.with(Serdes.String(), Serdes.String()))
                .count(Materialized.as("word-counts"));

        counts.toStream()
                .peek((word, count) -> System.out.printf("%s -> %d%n", word, count))
                .to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));

        return source;
    }
}

Producing lines such as kafka streams in spring and then spring kafka rocks yields incremental counts on word-count-output:

Output:

kafka -> 1
streams -> 1
in -> 1
spring -> 1
spring -> 2
kafka -> 2
rocks -> 1

For a simpler, stateless case you can transform and forward without aggregation — for example, uppercasing values:

@Bean
KStream<String, String> uppercaseStream(StreamsBuilder builder) {
    KStream<String, String> source = builder.stream("events");
    source.mapValues(v -> v.toUpperCase(Locale.ROOT)).to("events-upper");
    return source;
}

Managing the lifecycle

The StreamsBuilderFactoryBean is the lifecycle owner. By default it auto-starts after the context refreshes and stops on shutdown, so you rarely manage it manually. When you need to start streams on demand, inspect state, or register listeners, inject the factory bean directly.

package com.devcraftly.kafka;

import org.apache.kafka.streams.KafkaStreams;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.stereotype.Component;

@Component
public class StreamsLifecycle {

    private final StreamsBuilderFactoryBean factoryBean;

    public StreamsLifecycle(StreamsBuilderFactoryBean factoryBean) {
        this.factoryBean = factoryBean;
    }

    public KafkaStreams.State currentState() {
        KafkaStreams streams = factoryBean.getKafkaStreams();
        return streams != null ? streams.state() : null;
    }

    public void setUncaughtExceptionHandler() {
        factoryBean.setStreamsUncaughtExceptionHandler(
                ex -> KafkaStreams.StreamsUncaughtExceptionHandlerResponse.REPLACE_THREAD);
    }
}

Set factoryBean.setAutoStartup(false) if you want to defer startup, then call factoryBean.start() yourself once dependencies are ready. To react to lifecycle transitions, register a KafkaStreamsCustomizer or a state listener via factoryBean.getKafkaStreams().setStateListener(...) after start.

Key configuration options

PropertyPurposeTypical value
application.idGroup ID and state/topic namespaceunique per app
bootstrap.serversBroker addresseslocalhost:9092
default.key.serde / default.value.serdeFallback serializersSerdes$StringSerde
processing.guaranteeDelivery semanticsexactly_once_v2
num.stream.threadsParallel processing threads1–partition count
state.dirLocal RocksDB state location/var/lib/kafka-streams

This page only scratches the surface — windowing, joins, interactive queries, and exactly-once processing each warrant deeper treatment. See the dedicated Kafka Streams section for those topics.

Best Practices

  • Give every streams application a unique, stable application.id; treat it as part of your deployment contract since renaming it discards state and offsets.
  • Specify serdes explicitly on each operator (Consumed, Produced, Grouped, Materialized) rather than relying solely on defaults — it prevents subtle ClassCastExceptions at runtime.
  • Use processing.guarantee=exactly_once_v2 for stateful pipelines where duplicate or lost updates would corrupt results.
  • Always set a StreamsUncaughtExceptionHandler; the default behavior shuts the client down on the first unhandled error.
  • Size num.stream.threads against the input topic’s partition count, and provision durable state.dir storage so RocksDB state survives restarts.
  • Monitor the KafkaStreams.State (RUNNING, REBALANCING, ERROR) and expose it as a health indicator so orchestrators can detect a stalled topology.
Last updated June 1, 2026
Was this helpful?