Interactive Queries
Interactive Queries (IQ) let you read directly from the state stores that back your Kafka Streams application, turning your stream processor into a lightweight, queryable database. Instead of writing aggregation results out to an external store just so a web service can read them, you query the materialized store in place. The catch is that state is partitioned across instances, so any production-grade IQ layer must also know how to discover which instance owns a given key. This page covers reading local stores, discovering remote ones, and wrapping the whole thing in a REST API.
Why interactive queries
A typical analytics topology aggregates events into a KTable materialized in a RocksDB-backed state store. Without IQ you would sink that table to a topic and then to a database — adding latency, a moving part, and a consistency lag. With IQ the store is the read model. Each instance holds the partitions assigned to it, so reads are local, fast, and always reflect the latest processed offset.
The trade-off: there is no global view in a single instance. If you run three instances, key order-42 lives on exactly one of them. You must route the query to the right host.
Materializing a queryable store
Only stores given an explicit, durable name are queryable. Supply a Materialized with a name when you build the table.
StreamsBuilder builder = new StreamsBuilder();
builder.stream("clicks", Consumed.with(Serdes.String(), Serdes.Long()))
.groupByKey()
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("clicks-by-user")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
The string "clicks-by-user" is the store name you will pass to every query below.
Querying the local store
Once the application reaches the RUNNING state, obtain a read-only handle via KafkaStreams.store(), passing a StoreQueryParameters and the appropriate QueryableStoreType.
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"clicks-by-user",
QueryableStoreTypes.keyValueStore()));
Long count = store.get("user-7"); // single key
KeyValueIterator<String, Long> all = store.all(); // full scan — remember to close()
Always call
streams.store(...)after the client isRUNNING, and be ready to catchInvalidStateStoreException. Stores can become temporarily unavailable during a rebalance; treat the exception as a transient “retry shortly” signal, not a fatal error.
For windowed aggregations use QueryableStoreTypes.windowStore() and the fetch(key, from, to) method, which returns the windowed values for a key over a time range.
Discovering which instance owns a key
store.get("user-7") only returns a value if user-7’s partition is hosted on this instance. To support a cluster you must first locate the owning host. Streams exposes routing metadata via queryMetadataForKey, which computes the partition for the key (using the store’s key serializer) and tells you the host that owns it.
To make this work, every instance must advertise a reachable address through the application.server config — a host:port that other instances can call.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "click-analytics");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "10.0.0.4:8080"); // this instance's REST address
Now resolve a key to its host:
KeyQueryMetadata metadata = streams.queryMetadataForKey(
"clicks-by-user", "user-7", Serdes.String().serializer());
HostInfo active = metadata.activeHost(); // host:port that owns the key
boolean isLocal = active.host().equals(myHost) && active.port() == myPort;
If isLocal is true, query the local store directly. Otherwise, forward an HTTP request to active.host():active.port().
| Metadata method | Returns |
|---|---|
queryMetadataForKey(store, key, serializer) | KeyQueryMetadata (active + standby hosts) for one key |
streamsMetadataForStore(store) | All hosts holding any partition of a store |
metadataForAllStreamsClients() | Full cluster view across every store |
KeyQueryMetadata.standbyHosts() | Replicas you can fall back to (requires standby replicas) |
A REST layer over the stores
A clean pattern: every instance exposes the same REST endpoint. If the request lands on a node that owns the key, it answers locally; otherwise it proxies to the owning node. Below is a Spring Boot 3.x controller.
public record CountResponse(String key, long count) {}
@RestController
@RequestMapping("/counts")
public class CountController {
private final KafkaStreams streams;
private final HostInfo self;
private final WebClient http = WebClient.create();
public CountController(KafkaStreams streams,
@Value("${app.host}") String host,
@Value("${app.port}") int port) {
this.streams = streams;
this.self = new HostInfo(host, port);
}
@GetMapping("/{user}")
public CountResponse get(@PathVariable String user) {
KeyQueryMetadata meta = streams.queryMetadataForKey(
"clicks-by-user", user, Serdes.String().serializer());
HostInfo owner = meta.activeHost();
if (owner.equals(self)) {
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"clicks-by-user", QueryableStoreTypes.keyValueStore()));
Long value = store.get(user);
return new CountResponse(user, value == null ? 0 : value);
}
// Remote partition — proxy to the owning instance.
return http.get()
.uri("http://{h}:{p}/counts/{u}", owner.host(), owner.port(), user)
.retrieve()
.bodyToMono(CountResponse.class)
.block();
}
}
Output:
$ curl http://10.0.0.4:8080/counts/user-7
{"key":"user-7","count":1432}
The caller never needs to know where the key lives — any instance can serve any request.
Best Practices
- Name every store you intend to query; unnamed/
Materialized.asstores generated internally are not reliably queryable. - Only call
streams.store()when the client isRUNNING, and wrap reads in retry logic forInvalidStateStoreExceptionduring rebalances. - Always advertise
application.server(StreamsConfig.APPLICATION_SERVER_CONFIG) soqueryMetadataForKeycan route correctly across the cluster. - Use the same serializer for
queryMetadataForKeythat the topology uses for the store’s key, or the partition calculation will be wrong. - Enable standby replicas (
num.standby.replicas) and fall back tostandbyHosts()to keep serving reads while an active instance recovers. - Close
KeyValueIteratorand range/scan iterators in a try-with-resources block to avoid leaking RocksDB resources. - Treat IQ as eventually consistent: reads reflect processed offsets, which may lag the source topic during catch-up.