Stream Joins
Joins are how you correlate related events that arrive on separate topics — matching a click to the user who made it, an order to its payment, or a shipment to its line items. Kafka Streams gives you SQL-like joins that run continuously over unbounded data, but unlike a database join, the streaming runtime has to reason about time and data locality. Getting joins right in production hinges on understanding which join type to use, the windowing semantics, and the co-partitioning rule that most teams trip over at least once.
Join types at a glance
Kafka Streams offers four kinds of joins depending on whether each side is a record stream (KStream) or a changelog table (KTable/GlobalKTable). The choice determines whether the join is windowed, how nulls are handled, and what triggers output.
| Join | Left | Right | Windowed | Triggered by | Typical use |
|---|---|---|---|---|---|
| Stream-Stream | KStream | KStream | Yes (required) | Either side | Correlate two event flows in a time window |
| Stream-Table | KStream | KTable | No | Left (stream) only | Enrich an event with current reference data |
| Table-Table | KTable | KTable | No | Either side | Maintain a joined materialized view |
| Stream-GlobalTable | KStream | GlobalKTable | No | Left (stream) only | Enrich without co-partitioning |
Each comes in inner, left, and (for stream-stream and table-table) outer variants. Inner joins emit only when both sides match; left joins emit for every left record (right side null if no match); outer joins emit for unmatched records on either side.
Co-partitioning: the rule you cannot skip
For every join except GlobalKTable joins, both inputs must be co-partitioned. This means: the same number of partitions on both topics, the same partitioning strategy, and matching key types. Kafka Streams assigns each partition to a task, and a task can only see records from the partitions it owns — so a record on partition 3 of the left side can only ever match a record on partition 3 of the right side. If the keys that should match landed on different partitions, the join silently misses them.
Co-partitioning failures are insidious: there is no error, the join just produces fewer results than expected. If your join “loses” records, check partition counts and that both sides are keyed identically before suspecting anything else.
If the topics are not co-partitioned, repartition the offending side with selectKey(...) followed by an implicit or explicit repartition() so Kafka Streams writes an internal topic with the correct partitioning.
KStream<String, Order> ordersByCustomer = orders
.selectKey((k, order) -> order.customerId()) // re-key to the join key
.repartition(Repartitioned.with(Serdes.String(), orderSerde));
Stream-table enrichment join
The most common production join enriches a flowing event stream with relatively static reference data held in a table. Below, an orders stream is keyed by customerId and joined against a customers table to attach the customer’s tier and region. The join is driven entirely by the stream side — each incoming order looks up the current table value.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
public record Order(String orderId, String customerId, double amount) {}
public record Customer(String customerId, String tier, String region) {}
public record EnrichedOrder(String orderId, double amount, String tier, String region) {}
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Customer> customers = builder.table(
"customers",
Consumed.with(Serdes.String(), customerSerde));
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), orderSerde));
KStream<String, EnrichedOrder> enriched = orders.leftJoin(
customers,
(order, customer) -> new EnrichedOrder(
order.orderId(),
order.amount(),
customer == null ? "UNKNOWN" : customer.tier(),
customer == null ? "UNKNOWN" : customer.region()),
Joined.with(Serdes.String(), orderSerde, customerSerde));
enriched.to("enriched-orders", Produced.with(Serdes.String(), enrichedSerde));
A leftJoin guarantees every order produces output even if the customer record has not arrived yet, which is usually the right call for enrichment — you do not want to drop orders. Use join (inner) only when an unmatched event is meaningless downstream.
Output:
order=A-100 amount=49.99 tier=GOLD region=EU
order=A-101 amount=12.00 tier=UNKNOWN region=UNKNOWN # customer not yet seen (leftJoin)
Stream-stream windowed join
When both sides are event streams, matches are only meaningful within a bounded time window — you cannot buffer two infinite streams forever. Define the window with JoinWindows, and Kafka Streams keeps both sides in a windowed state store for that duration plus a grace period.
KStream<String, Click> clicks = builder.stream("clicks", /* serdes */);
KStream<String, Impression> impressions = builder.stream("impressions", /* serdes */);
KStream<String, Attribution> attributed = impressions.join(
clicks,
(impression, click) -> new Attribution(impression.adId(), click.userId()),
JoinWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5), Duration.ofMinutes(1)),
StreamJoined.with(Serdes.String(), impressionSerde, clickSerde));
This emits an attribution whenever a click and impression for the same key occur within five minutes of each other. The grace period tolerates late-arriving records before the window result is final.
GlobalKTable join: enrichment without co-partitioning
A GlobalKTable is replicated in full to every application instance, so it does not need to be co-partitioned with the stream. This makes it ideal for enriching against small-to-medium reference data when the stream’s key differs from the table’s key — you supply a key-extractor to map each stream record to a table key.
GlobalKTable<String, Customer> globalCustomers = builder.globalTable(
"customers", Consumed.with(Serdes.String(), customerSerde));
KStream<String, EnrichedOrder> enriched = orders.leftJoin(
globalCustomers,
(orderKey, order) -> order.customerId(), // map stream record -> table key
(order, customer) -> new EnrichedOrder(
order.orderId(), order.amount(),
customer == null ? "UNKNOWN" : customer.tier(),
customer == null ? "UNKNOWN" : customer.region()));
Because the table is fully replicated, every instance carries the entire dataset in memory/RocksDB — great for lookups, costly if the table is large or high-churn.
Best Practices
- Verify co-partitioning (partition count, partitioner, key type) for every non-global join before deploying; re-key with
selectKey+repartitionwhen sides differ. - Prefer
leftJoinfor enrichment so events are never silently dropped when reference data is missing or late. - Size
JoinWindowsfrom real event-time skew, and always set agraceperiod to handle out-of-order arrivals deterministically. - Reach for a
GlobalKTablewhen the stream key differs from the table key or co-partitioning is impractical — but only for bounded, slow-changing data. - Provide explicit
SerdesviaJoined/StreamJoined/Consumedrather than relying on defaults; mismatched serdes are a frequent cause of join failures. - Stream-stream joins are stateful and disk-backed — provision storage and monitor the changelog topics for the windowed stores.