Windowing
Most real-time analytics questions are bounded by time: “how many orders in the last minute?”, “what was the rolling 5-minute average?”, “how long was this user’s session?”. Windowing lets Kafka Streams group records that share a key into finite time buckets and run aggregations over each bucket independently. Because streams are unbounded, windowing is also how you cap the amount of state you retain and how you reason about late-arriving events. Getting windows right is the difference between accurate, production-grade metrics and silently dropped data.
Why windows exist
A plain groupByKey().count() aggregates over all time and the result keeps growing forever. A windowed aggregation instead produces one result per key per window, so the output key becomes a Windowed<K> carrying both the original key and the window’s start/end timestamps. Window boundaries are driven by event time — the timestamp embedded in each record — not by wall-clock time, which is what makes results deterministic and replayable.
Window types
Kafka Streams offers four window types. Choose based on whether windows overlap and whether their size is fixed.
| Type | Overlap | Size | Typical use |
|---|---|---|---|
| Tumbling | None | Fixed | Per-minute counts, billing buckets |
| Hopping | Yes (advance < size) | Fixed | Smooth rolling metrics |
| Sliding | Yes (event-driven) | Fixed | Pairwise/proximity aggregations |
| Session | Activity-driven | Variable | User sessions, bursts of activity |
Tumbling windows
Tumbling windows are fixed-size, non-overlapping, and contiguous — every record belongs to exactly one window.
size = 1 min, advance = 1 min
|---W1---|---W2---|---W3---|
0 1m 2m 3m
events e e e e e e
Hopping windows
Hopping windows are fixed-size but advance by a smaller step, so they overlap and a record can land in multiple windows.
size = 2 min, advance = 1 min
|------W1------|
|------W2------|
|------W3------|
0 1m 2m 3m 4m
Sliding windows
Sliding windows are fixed-size and created only when records exist; two records fall in the same window if their timestamps differ by no more than the window size. They are tailored for WindowedBy aggregations where overlap is determined by the data itself rather than a fixed grid.
size = 1 min — a window per event pair within 1 min
e1 e2 e3 e4
[--w--] [----w----] [--w--]
Session windows
Session windows have no fixed size. A window stays open while records keep arriving within an inactivity gap; a gap larger than the threshold closes the session and starts a new one. Adjacent sessions can merge when a late record bridges them.
inactivity gap = 30s
e e e [gap > 30s] e e
|session A| |session B|
A worked example: count events per 1-minute tumbling window
The example below counts records per key in 1-minute tumbling windows, allows 10 seconds of grace for late events, and emits only the final result per window using suppress().
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.WindowStore;
import java.time.Duration;
KStream<String, String> events = builder.stream("page-views");
KTable<Windowed<String>, Long> counts = events
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(10)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("views-per-minute"))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
counts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key()
+ "@" + windowedKey.window().startTime()
+ "->" + windowedKey.window().endTime(),
count))
.to("views-per-minute-out",
Produced.with(Serdes.String(), Serdes.Long()));
Reading the windowed output topic shows one record per key per closed window:
Output:
home@2026-06-01T10:00:00Z->2026-06-01T10:01:00Z 142
home@2026-06-01T10:01:00Z->2026-06-01T10:02:00Z 137
about@2026-06-01T10:00:00Z->2026-06-01T10:01:00Z 58
For hopping windows, replace the windowedBy clause with TimeWindows.ofSizeAndGrace(...).advanceBy(Duration.ofSeconds(30)). For sessions, use SessionWindows.ofInactivityGapAndGrace(Duration.ofSeconds(30), Duration.ofSeconds(5)) and aggregate with count() or aggregate(...).
Grace period and late events
Each window has a logical close time of window-end + grace. Records whose event time falls inside the window but arrive before the window closes are folded in and update the aggregate; records arriving after the grace period are dropped and counted in the dropped-records metric. The grace period is the single knob that trades latency for completeness — a longer grace tolerates more out-of-order data but delays final emission.
Pre-3.0 code used
until()/Windows.grace(...)separately. Prefer theofSizeAndGrace/ofInactivityGapAndGracefactory methods; they make grace explicit and prevent the old default of a 24-hour retention masking dropped records.
Final results with suppress()
By default a windowed aggregation emits an updated result on every incoming record, so downstream consumers see many intermediate values per window. suppress(Suppressed.untilWindowCloses(...)) buffers updates and emits only once, when the window closes (i.e., after the grace period elapses). This is ideal for alerts, dashboards, and downstream systems that should see a single authoritative value per window.
suppressbuffers state in memory. WithBufferConfig.unbounded()a flood of distinct keys can exhaust the heap. In high-cardinality pipelines, bound the buffer withmaxBytes(...)plusshutDownWhenFull()oremitEarlyWhenFull()and size your JVM accordingly.
Note that suppressed final results are driven by stream time, which only advances as new records flow through. If a partition goes idle, its last window may not close until more data arrives — a common surprise in low-traffic topics.
Best Practices
- Always set an explicit grace period sized to your real-world clock skew and broker delay; never rely on defaults.
- Use tumbling windows for non-overlapping reporting buckets and hopping windows for smooth rolling metrics — don’t simulate one with the other.
- Reach for session windows when activity boundaries are data-driven (user sessions, IoT bursts) rather than fixed clocks.
- Add
suppress(untilWindowCloses)only where downstream truly needs one final value per window; bound the buffer in high-cardinality cases. - Provide correct event-time timestamps via a
TimestampExtractor; windowing is only as accurate as the timestamps it reads. - Monitor the
dropped-recordsandrecord-latenessmetrics to detect when grace is too tight or producers are lagging. - Give windowed state stores meaningful names so changelog topics and Interactive Queries remain stable across deployments.