Stateful Operations
Stateless operators like filter and mapValues look at one record and forget it. Stateful operators are different: to compute a count, a running sum, or a join, Kafka Streams must remember what it has already seen. That memory lives in a state store, and the store is made durable and fault-tolerant by a changelog topic in Kafka. Understanding how state is built, partitioned, and restored is what separates a toy topology from one you can confidently run in production across restarts, rebalances, and instance failures.
What makes an operation stateful
An operation is stateful when its output for a given record depends on previously processed records. The three families you will use most are aggregations, joins, and windowed variants of both.
- Aggregations —
count,reduce, andaggregateaccumulate a value per key. - Joins —
KStream–KTableandKTable–KTablejoins must keep one or both sides materialized to look up the matching record. - Windowing — time-bounded aggregations keep per-window state until the window closes and is retained for grace.
Each of these is backed by a state store. Stateless operators carry no store and need no changelog, which is why they are cheap and infinitely scalable; stateful operators trade that simplicity for the ability to compute over history.
State stores and changelog topics
A state store is a local key-value store on the instance running a task — by default a RocksDB instance on local disk, with an in-memory option available. Reads and writes are fast because they are local. But local disk is not durable: if the instance dies, that state must be reconstructable somewhere else.
That somewhere is the changelog topic. Every update to a state store is also written to a compacted Kafka topic named <application.id>-<store-name>-changelog. Because the topic is log-compacted, Kafka retains the latest value per key, so the topic is always a complete snapshot of the store’s contents. The store is the fast local copy; the changelog is the durable source of truth.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders =
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde));
KTable<String, Long> ordersPerCustomer = orders
.groupBy((key, order) -> order.customerId(),
Grouped.with(Serdes.String(), orderSerde))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("orders-per-customer")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
ordersPerCustomer.toStream().to("order-counts",
Produced.with(Serdes.String(), Serdes.Long()));
Here the store named orders-per-customer produces a changelog topic my-app-orders-per-customer-changelog. Naming the store explicitly keeps that topic, its metrics, and any interactive queries stable across redeploys.
Always name your state stores via
Materialized.as(...). An unnamed store gets an internally generated name that can change when you edit the topology, orphaning the old changelog and silently dropping accumulated state.
How state is partitioned
State in Kafka Streams is partitioned, never global. Each stream task owns one partition of the input and the slice of state that belongs to it. Because aggregations and joins operate per key, all records for a given key must be routed to the same task — otherwise that key’s running total would be split across instances.
This co-location guarantee is the reason stateful operations are scalable: you can run as many instances as you have partitions, and each handles a disjoint subset of keys with its own independent store and changelog partition.
input topic "orders" tasks (state per partition)
---------------------- ----------------------------
partition 0 ──────────────▶ task 0 ▶ store[p0] ▶ changelog p0
partition 1 ──────────────▶ task 1 ▶ store[p1] ▶ changelog p1
partition 2 ──────────────▶ task 2 ▶ store[p2] ▶ changelog p2
Repartitioning: groupByKey vs groupBy
For per-key state to be correct, records must be partitioned by the aggregation key. groupByKey() is used when the stream is already keyed correctly — it does no network shuffle and is cheap. groupBy(...) selects a new key, which means records may now belong on a different partition than where they currently live. Kafka Streams handles this by transparently writing the re-keyed records to an internal repartition topic and reading them back, so that every record for a new key lands on the same task.
// No repartition: stream already keyed by the grouping key
KTable<String, Long> byKey = orders
.groupByKey(Grouped.with(Serdes.String(), orderSerde))
.count();
// Triggers a repartition topic: new key differs from the record key
KTable<String, Long> byProduct = orders
.groupBy((key, order) -> order.productId(),
Grouped.with(Serdes.String(), orderSerde))
.count(Materialized.as("count-by-product"));
| Operator | Re-keys? | Repartition topic | When to use |
|---|---|---|---|
groupByKey() | No | None | Record key already equals the aggregation key |
groupBy(selector) | Yes | Created automatically | Aggregating by a field other than the current key |
Repartition topics cost an extra round trip through the broker, so avoid needless re-keying. Supplying a Grouped.with(...) (or Repartitioned) lets you name the topic and set its serdes explicitly.
Restoration after failure or rebalance
When a task starts on a new instance — after a crash, a scale-out, or a rebalance — its local store may be empty or stale. Kafka Streams restores the store by replaying its changelog partition from the beginning before processing resumes. For large stores this can take time, which is why standby replicas (num.standby.replicas) are valuable: they keep warm copies of the changelog applied on other instances so failover is near-instant.
# Keep one hot standby copy of each store on another instance
num.standby.replicas=1
Output:
INFO Restoration in progress for 1 partitions. Total records to restore: 482931
INFO Restoration complete for partition orders-per-customer-changelog-2
INFO State transition from REBALANCING to RUNNING
Best Practices
- Name every state store with
Materialized.as(...)so changelog topics and queries stay stable across topology changes. - Prefer
groupByKey()overgroupBy()when the record is already keyed correctly to avoid an unnecessary repartition topic. - Configure
num.standby.replicas >= 1for stateful apps to minimise restoration time after failures and rebalances. - Always set explicit serdes via
Grouped,Materialized, andConsumed; the value serde forcount()must beSerdes.Long(). - Keep state-store keys bounded — unbounded key growth (e.g. raw event IDs) bloats RocksDB and the changelog; use windowing or expiry for high-cardinality data.
- Provision changelog topics on fast, durable storage and monitor restore time and store size as first-class operational metrics.