State Stores
Every stateful operation in Kafka Streams — a count, a join, a windowed aggregation — needs somewhere to remember what it has already seen. That memory lives in a state store: a fast, local key-value store sitting next to the processing thread. The store gives you millisecond reads and writes, while a Kafka-side changelog topic makes it durable enough to survive crashes, rebalances, and full instance loss. Knowing how stores are backed, recovered, and replicated is what lets you run stateful topologies in production without losing data or stalling on restart.
RocksDB vs in-memory stores
By default, Kafka Streams materializes state in a persistent RocksDB store on local disk. RocksDB is an embedded LSM-tree key-value engine that spills to disk, so it can hold state far larger than the JVM heap and survives a process restart by reusing the on-disk files. This is the right default for almost everything: large aggregations, joins, and long windows.
The alternative is an in-memory store. It keeps all entries on the JVM heap, so it is faster but bounded by heap size and lost entirely on restart (it must be fully rebuilt from the changelog). Use it only for small, hot state where you have heap to spare.
| Aspect | Persistent (RocksDB) | In-memory |
|---|---|---|
| Backing | Local disk (LSM tree) | JVM heap |
| Capacity | Larger than RAM | Bounded by heap |
| Restart behaviour | Restores from local files + changelog tail | Full rebuild from changelog |
| Typical use | Default for most state | Small, hot state |
You choose the implementation through the Stores factory when building a Materialized or StoreBuilder:
// Persistent (default) — explicit for clarity
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> persistent =
Materialized.as(Stores.persistentKeyValueStore("orders-per-customer"));
// In-memory variant
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> inMemory =
Materialized.as(Stores.inMemoryKeyValueStore("orders-per-customer"));
Changelog topics and fault tolerance
The local store is fast but not durable on its own — disk dies and instances move. So every write to a state store is mirrored to a changelog topic named <application.id>-<store-name>-changelog. This topic is log-compacted, meaning Kafka keeps the latest value per key, so the changelog is always a complete snapshot of the store.
When a task moves to another instance (after a crash or rebalance), Streams reconstructs the store by replaying the changelog from the beginning until it catches the tip. During this restore the task is not processing new records, which is why restore time is a real production concern. RocksDB stores keep a local checkpoint of the changelog offset they last persisted, so a clean restart only replays the small tail instead of the whole topic.
Restore time scales with changelog size. A multi-gigabyte store with no warm standby can take minutes to rebuild, blocking partitions the whole time. Plan for this in your failover budget.
Standby replicas for fast failover
To shrink that restore window, configure standby replicas. A standby is a passive copy of a task’s state, kept up to date on a different instance by continuously consuming the changelog. When the active task fails, Streams promotes the instance that already holds a warm standby, so failover is near-instant instead of a cold replay.
# Number of warm standby copies of each store kept on other instances
num.standby.replicas=1
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-service");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
With num.standby.replicas=1 each store has one extra warm copy, doubling its disk and changelog-consumption cost but giving you fast recovery. This only helps when you run more instances than partitions worth of active tasks — the standbys need somewhere else to live.
Adding and accessing a named store
For the Processor API (or a transformer) you register a store explicitly with a StoreBuilder, then connect it to the processor and read it inside process/init.
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, Long>> countStore =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("visit-counts"),
Serdes.String(),
Serdes.Long());
builder.addStateStore(countStore);
builder.stream("page-views", Consumed.with(Serdes.String(), Serdes.String()))
.process(CountProcessor::new, "visit-counts");
public class CountProcessor implements Processor<String, String, String, Long> {
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, Long> context) {
this.store = context.getStateStore("visit-counts");
}
@Override
public void process(Record<String, String> record) {
Long current = store.get(record.key());
store.put(record.key(), current == null ? 1L : current + 1L);
}
}
After the application is running you can also read materialized stores from outside the topology using interactive queries:
ReadOnlyKeyValueStore<String, Long> view = streams.store(
StoreQueryParameters.fromNameAndType(
"orders-per-customer", QueryableStoreTypes.keyValueStore()));
Long count = view.get("customer-42");
System.out.println("orders for customer-42 = " + count);
Output:
orders for customer-42 = 137
Best Practices
- Keep the default persistent RocksDB store unless your state is small and you can afford a full changelog rebuild on restart.
- Always name stores explicitly via
Stores.persistentKeyValueStore("name")so the changelog topic, metrics, and interactive queries stay stable across redeploys. - Set
num.standby.replicas=1(or more) for low-latency failover, and run more instances than active tasks so the standbys have a home. - Mount RocksDB state directories (
state.dir) on fast local SSDs and persistent volumes so clean restarts replay only the changelog tail, not the whole topic. - Never disable changelogging on a store you cannot afford to lose;
withLoggingDisabled()removes all fault tolerance. - Tune RocksDB memory with a
RocksDBConfigSetterwhen running many stores per instance to avoid off-heap blowups.