Skip to content
Apache Kafka kf patterns 4 min read

Stream Processing Patterns

Most real-world event-driven systems do more than move messages from A to B — they transform, correlate, and summarize events as they flow. Kafka Streams and ksqlDB turn topics into continuously updated computations, letting you build enrichment pipelines, real-time metrics, and queryable materialized views without standing up a separate batch system. The patterns below recur in almost every streaming codebase; knowing when to reach for each one keeps your topology simple and your latency predictable.

Enrichment with a stream-table join

Enrichment is the single most common pattern: an incoming event carries an identifier (a userId, productId, accountId) but lacks the surrounding context a downstream consumer needs. You join the event stream against a KTable built from a compacted “reference” topic so every record is decorated in place.

A KStreamKTable join is asynchronous and keyed: the stream record looks up the current value of the matching key in the table. The table is materialized in a local RocksDB store, so lookups are local and fast — no external database call per event.

@Bean
public KStream<String, EnrichedOrder> enrichOrders(StreamsBuilder builder) {
    KTable<String, Customer> customers =
        builder.table("customers", Consumed.with(Serdes.String(), customerSerde));

    KStream<String, Order> orders =
        builder.stream("orders", Consumed.with(Serdes.String(), orderSerde));

    KStream<String, EnrichedOrder> enriched = orders
        .selectKey((k, order) -> order.customerId())
        .join(customers,
              (order, customer) -> new EnrichedOrder(order, customer.name(), customer.tier()),
              Joined.with(Serdes.String(), orderSerde, customerSerde));

    enriched.to("orders-enriched", Produced.with(Serdes.String(), enrichedSerde));
    return enriched;
}
public record Order(String orderId, String customerId, BigDecimal amount) {}
public record EnrichedOrder(Order order, String customerName, String tier) {}

The reference topic must be log-compacted (cleanup.policy=compact) and keyed by the join key. A regular retention topic will silently drop older keys, leaving you with null lookups and dropped joins.

Filtering and content-based routing

Filtering removes records you don’t care about; routing fans a single stream into multiple topics based on content. Use filter for predicates and split() for branching — it is clearer and more efficient than chaining several filter/to pairs.

Map<String, KStream<String, Payment>> branches = builder
    .stream("payments", Consumed.with(Serdes.String(), paymentSerde))
    .filter((k, p) -> p.amount().signum() > 0)               // drop zero/refunds here
    .split(Named.as("by-risk-"))
    .branch((k, p) -> p.amount().compareTo(new BigDecimal("10000")) >= 0,
            Branched.as("high"))
    .branch((k, p) -> true, Branched.as("normal"))
    .noDefaultBranch();

branches.get("by-risk-high").to("payments-review");
branches.get("by-risk-normal").to("payments-auto");

Windowed aggregation and metrics

Aggregation collapses many events into a running summary. Pair it with a window when you want metrics “per minute” or “per 5-minute hopping interval” rather than for all time. The result is a windowed KTable you can push to a metrics topic or query.

KTable<Windowed<String>, Long> perMinute = builder
    .stream("page-views", Consumed.with(Serdes.String(), Serdes.String()))
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(10)))
    .count(Materialized.as("views-per-minute"));

perMinute.toStream()
    .map((wk, count) -> KeyValue.pair(wk.key(), count))
    .to("views-per-minute", Produced.with(Serdes.String(), Serdes.Long()));

The same logic in ksqlDB is often a one-liner:

CREATE TABLE views_per_minute AS
  SELECT page_id, COUNT(*) AS views
  FROM page_views
  WINDOW TUMBLING (SIZE 1 MINUTE)
  GROUP BY page_id
  EMIT CHANGES;
Window typeBehaviorTypical use
TumblingFixed, non-overlappingPer-minute counts, billing buckets
HoppingFixed size, overlapping by an advanceSmoothed rolling metrics
SlidingWindow per record pair within a durationAnomaly/correlation detection
SessionGap-bounded by inactivityUser sessions, activity bursts

Deduplication

At-least-once delivery means duplicates are normal. To deduplicate, key the stream by the business idempotency key and keep a small state store of keys already seen within a time bound, using transformValues (or a processor) with a windowed store so memory is bounded.

public KeyValue<String, Event> transform(String key, Event value) {
    String idemKey = value.idempotencyKey();
    if (seenStore.fetch(idemKey, context.timestamp() - WINDOW_MS, context.timestamp())
                 .hasNext()) {
        return null; // duplicate within window — drop
    }
    seenStore.put(idemKey, true, context.timestamp());
    return KeyValue.pair(key, value);
}

Deduplication is windowed, not permanent. It absorbs retry storms, not true business idempotency. Keep consumers idempotent at the sink (e.g. upsert by key) as well.

Materialized views with interactive queries

A materialized view is an aggregation or table you can query directly from your service — no separate read database. Materialize the store and expose it through Kafka Streams interactive queries.

@Bean
public KTable<String, Long> orderCounts(StreamsBuilder builder) {
    return builder.stream("orders", Consumed.with(Serdes.String(), orderSerde))
        .groupBy((k, o) -> o.customerId(), Grouped.with(Serdes.String(), orderSerde))
        .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("order-counts"));
}
@GetMapping("/customers/{id}/order-count")
public long count(@PathVariable String id) {
    ReadOnlyKeyValueStore<String, Long> store = streams.store(
        StoreQueryParameters.fromNameAndType("order-counts", QueryableStoreTypes.keyValueStore()));
    Long v = store.get(id);
    return v == null ? 0 : v;
}

Output:

GET /customers/c-42/order-count
17

For multi-instance deployments, streams.queryMetadataForKey(...) tells you which instance owns a key so you can proxy the request. This gives you a horizontally scalable read model that is always consistent with the event log.

Best Practices

  • Build reference tables from compacted topics keyed by the join key; never join against a retention-only topic.
  • Always co-partition the inputs of a join — same key, same partition count — or rekey first with selectKey/repartition.
  • Set explicit grace periods on windows so late-arriving events are handled deterministically instead of silently dropped.
  • Provide typed Serdes explicitly; relying on default serdes hides serialization bugs until runtime.
  • Treat windowed deduplication as a noise filter, not a correctness guarantee — keep sinks idempotent too.
  • Name your state stores (Materialized.as(...)) so they are queryable, observable, and survive topology changes cleanly.
  • Prefer ksqlDB for simple filter/aggregate/join logic and Kafka Streams when you need custom processors or fine-grained state control.
Last updated June 1, 2026
Was this helpful?