Stream Processing Patterns
Most real-world event-driven systems do more than move messages from A to B — they transform, correlate, and summarize events as they flow. Kafka Streams and ksqlDB turn topics into continuously updated computations, letting you build enrichment pipelines, real-time metrics, and queryable materialized views without standing up a separate batch system. The patterns below recur in almost every streaming codebase; knowing when to reach for each one keeps your topology simple and your latency predictable.
Enrichment with a stream-table join
Enrichment is the single most common pattern: an incoming event carries an identifier (a userId, productId, accountId) but lacks the surrounding context a downstream consumer needs. You join the event stream against a KTable built from a compacted “reference” topic so every record is decorated in place.
A KStream–KTable join is asynchronous and keyed: the stream record looks up the current value of the matching key in the table. The table is materialized in a local RocksDB store, so lookups are local and fast — no external database call per event.
@Bean
public KStream<String, EnrichedOrder> enrichOrders(StreamsBuilder builder) {
KTable<String, Customer> customers =
builder.table("customers", Consumed.with(Serdes.String(), customerSerde));
KStream<String, Order> orders =
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde));
KStream<String, EnrichedOrder> enriched = orders
.selectKey((k, order) -> order.customerId())
.join(customers,
(order, customer) -> new EnrichedOrder(order, customer.name(), customer.tier()),
Joined.with(Serdes.String(), orderSerde, customerSerde));
enriched.to("orders-enriched", Produced.with(Serdes.String(), enrichedSerde));
return enriched;
}
public record Order(String orderId, String customerId, BigDecimal amount) {}
public record EnrichedOrder(Order order, String customerName, String tier) {}
The reference topic must be log-compacted (
cleanup.policy=compact) and keyed by the join key. A regular retention topic will silently drop older keys, leaving you with null lookups and dropped joins.
Filtering and content-based routing
Filtering removes records you don’t care about; routing fans a single stream into multiple topics based on content. Use filter for predicates and split() for branching — it is clearer and more efficient than chaining several filter/to pairs.
Map<String, KStream<String, Payment>> branches = builder
.stream("payments", Consumed.with(Serdes.String(), paymentSerde))
.filter((k, p) -> p.amount().signum() > 0) // drop zero/refunds here
.split(Named.as("by-risk-"))
.branch((k, p) -> p.amount().compareTo(new BigDecimal("10000")) >= 0,
Branched.as("high"))
.branch((k, p) -> true, Branched.as("normal"))
.noDefaultBranch();
branches.get("by-risk-high").to("payments-review");
branches.get("by-risk-normal").to("payments-auto");
Windowed aggregation and metrics
Aggregation collapses many events into a running summary. Pair it with a window when you want metrics “per minute” or “per 5-minute hopping interval” rather than for all time. The result is a windowed KTable you can push to a metrics topic or query.
KTable<Windowed<String>, Long> perMinute = builder
.stream("page-views", Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(10)))
.count(Materialized.as("views-per-minute"));
perMinute.toStream()
.map((wk, count) -> KeyValue.pair(wk.key(), count))
.to("views-per-minute", Produced.with(Serdes.String(), Serdes.Long()));
The same logic in ksqlDB is often a one-liner:
CREATE TABLE views_per_minute AS
SELECT page_id, COUNT(*) AS views
FROM page_views
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY page_id
EMIT CHANGES;
| Window type | Behavior | Typical use |
|---|---|---|
| Tumbling | Fixed, non-overlapping | Per-minute counts, billing buckets |
| Hopping | Fixed size, overlapping by an advance | Smoothed rolling metrics |
| Sliding | Window per record pair within a duration | Anomaly/correlation detection |
| Session | Gap-bounded by inactivity | User sessions, activity bursts |
Deduplication
At-least-once delivery means duplicates are normal. To deduplicate, key the stream by the business idempotency key and keep a small state store of keys already seen within a time bound, using transformValues (or a processor) with a windowed store so memory is bounded.
public KeyValue<String, Event> transform(String key, Event value) {
String idemKey = value.idempotencyKey();
if (seenStore.fetch(idemKey, context.timestamp() - WINDOW_MS, context.timestamp())
.hasNext()) {
return null; // duplicate within window — drop
}
seenStore.put(idemKey, true, context.timestamp());
return KeyValue.pair(key, value);
}
Deduplication is windowed, not permanent. It absorbs retry storms, not true business idempotency. Keep consumers idempotent at the sink (e.g. upsert by key) as well.
Materialized views with interactive queries
A materialized view is an aggregation or table you can query directly from your service — no separate read database. Materialize the store and expose it through Kafka Streams interactive queries.
@Bean
public KTable<String, Long> orderCounts(StreamsBuilder builder) {
return builder.stream("orders", Consumed.with(Serdes.String(), orderSerde))
.groupBy((k, o) -> o.customerId(), Grouped.with(Serdes.String(), orderSerde))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("order-counts"));
}
@GetMapping("/customers/{id}/order-count")
public long count(@PathVariable String id) {
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType("order-counts", QueryableStoreTypes.keyValueStore()));
Long v = store.get(id);
return v == null ? 0 : v;
}
Output:
GET /customers/c-42/order-count
17
For multi-instance deployments, streams.queryMetadataForKey(...) tells you which instance owns a key so you can proxy the request. This gives you a horizontally scalable read model that is always consistent with the event log.
Best Practices
- Build reference tables from compacted topics keyed by the join key; never join against a retention-only topic.
- Always co-partition the inputs of a join — same key, same partition count — or rekey first with
selectKey/repartition. - Set explicit grace periods on windows so late-arriving events are handled deterministically instead of silently dropped.
- Provide typed
Serdes explicitly; relying on default serdes hides serialization bugs until runtime. - Treat windowed deduplication as a noise filter, not a correctness guarantee — keep sinks idempotent too.
- Name your state stores (
Materialized.as(...)) so they are queryable, observable, and survive topology changes cleanly. - Prefer ksqlDB for simple filter/aggregate/join logic and Kafka Streams when you need custom processors or fine-grained state control.