Skip to content
Apache Kafka kf streams 4 min read

Streams Setup & Topology

Kafka Streams is a client library, not a separate cluster — your application is the stream processor. That means bootstrapping a Streams app comes down to three concerns: supplying configuration (most importantly the application.id), describing the processing topology with StreamsBuilder, and managing the KafkaStreams lifecycle so it starts cleanly and shuts down gracefully. Getting these foundations right is what makes a Streams application stable, scalable, and safe to redeploy in production.

Add the dependency

Kafka Streams ships as its own artifact, separate from kafka-clients. With Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.7.0</version>
</dependency>

Or with Gradle:

implementation 'org.apache.kafka:kafka-streams:3.7.0'

kafka-streams transitively pulls in kafka-clients, so you do not declare it twice.

Configuring the application

Configuration is passed as a Properties object (or Map) wrapped by StreamsConfig. Three keys are mandatory in practice.

PropertyConstantPurpose
application.idStreamsConfig.APPLICATION_ID_CONFIGUnique identity for the app. Used as the consumer group ID, the prefix for internal topics, and the state-store directory name.
bootstrap.serversStreamsConfig.BOOTSTRAP_SERVERS_CONFIGOne or more broker addresses to discover the cluster.
default.key.serde / default.value.serdeDEFAULT_KEY_SERDE_CLASS_CONFIG / DEFAULT_VALUE_SERDE_CLASS_CONFIGSerdes used whenever an operator does not specify its own.

The application.id is the single most important setting. Every instance of your deployment that shares the same application.id joins the same consumer group and cooperatively splits the input partitions between them — that is how Kafka Streams scales horizontally. Changing it creates a brand-new application that reprocesses from its configured offset.

Warning: Treat application.id as immutable for a given deployment. Renaming it orphans the previous app’s committed offsets, internal changelog topics, and local state, which usually means full reprocessing and possible duplicate output.

Building the topology

A topology is the directed graph of processing steps: sources, operators, and sinks. You declare it fluently with StreamsBuilder, then call build() to produce an immutable Topology that the KafkaStreams engine executes.

The example below reads text lines, uppercases each value, and writes the result to an output topic — a complete stateless transform.

package com.devcraftly.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class UppercaseApp {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "uppercase-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                Serdes.String().getClass());

        // Build the topology
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> lines = builder.stream("input-text");
        lines.mapValues(value -> value.toUpperCase())
             .to("output-text");

        Topology topology = builder.build();
        System.out.println(topology.describe());

        KafkaStreams streams = new KafkaStreams(topology, props);

        // Coordinate a clean shutdown
        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close();
            latch.countDown();
        }));

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

Calling topology.describe() prints the structure, which is invaluable for understanding and debugging what was wired up.

Output:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input-text])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-SINK-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Sink: KSTREAM-SINK-0000000002 (topic: output-text)
      <-- KSTREAM-MAPVALUES-0000000001

Starting and stopping the engine

KafkaStreams does nothing until you call start(), which spins up the stream threads and begins consuming. Because start() returns immediately, the example keeps the JVM alive with a CountDownLatch rather than busy-waiting.

The shutdown hook is not optional in production. When the JVM receives SIGTERM (for instance during a Kubernetes rolling update), streams.close() lets the application leave the consumer group cooperatively, commit final offsets, and flush state stores. Skipping it triggers a session timeout, an avoidable rebalance, and slower failover.

You can also observe lifecycle transitions to wire up readiness probes:

streams.setStateListener((newState, oldState) -> {
    System.out.printf("State change: %s -> %s%n", oldState, newState);
    // Expose RUNNING/REBALANCING for health checks
});

Run the app and produce a record to verify the round trip:

echo "hello kafka" | kafka-console-producer.sh \
  --bootstrap-server localhost:9092 --topic input-text

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic output-text --from-beginning

Output:

HELLO KAFKA

Best Practices

  • Set a stable, descriptive application.id and never change it for an existing deployment — it anchors group membership, internal topics, and local state.
  • Always register a JVM shutdown hook that calls streams.close() (optionally with a timeout) so instances leave the group cleanly during deploys.
  • Prefer explicit per-operator serdes over relying solely on defaults; it makes type expectations clear and avoids silent ClassCastException surprises.
  • Use topology.describe() in development and code review to confirm the graph (and sub-topology boundaries) matches your intent.
  • Attach a StateListener and an UncaughtExceptionHandler so health checks and alerting reflect the real engine state instead of just “process is up.”
  • Give each instance enough num.stream.threads to match its partition share, and scale out by running more instances with the same application.id.
Last updated June 1, 2026
Was this helpful?