Skip to content
Apache Kafka kf streams 4 min read

Processor API

The Kafka Streams DSL (map, filter, aggregate, joins) covers the vast majority of streaming use cases, but it deliberately hides the underlying execution engine. When you need fine-grained control — emitting records on a timer, accessing multiple state stores from one node, or implementing logic the DSL cannot express — you drop down to the Processor API. It exposes the raw building blocks of a topology: processor nodes, state stores, and a ProcessorContext that lets you forward records and schedule work. In production, the Processor API is the escape hatch that turns “the DSL almost does what I want” into a clean, testable implementation.

Processor and ProcessorSupplier

A processor is a single node in the topology. You implement Processor<KIn, VIn, KOut, VOut>, which receives one record at a time through process(Record). Stateless setup happens in init(ProcessorContext), where you capture the context and look up any state stores by name. A ProcessorSupplier hands a fresh processor instance to each stream task, so never share mutable state across instances.

public class DedupeProcessor implements Processor<String, Order, String, Order> {

    private ProcessorContext<String, Order> context;
    private KeyValueStore<String, Long> seenStore;

    @Override
    public void init(ProcessorContext<String, Order> context) {
        this.context = context;
        this.seenStore = context.getStateStore("seen-orders");
    }

    @Override
    public void process(Record<String, Order> record) {
        String id = record.value().orderId();
        if (seenStore.get(id) == null) {
            seenStore.put(id, record.timestamp());
            context.forward(record);          // pass downstream only if new
        }
        // duplicates are silently dropped
    }
}

Record carries the key, value, timestamp, and headers. Note the generics: input types may differ from output types, which is how a processor can transform records, not just filter them.

ProcessorContext and forward()

ProcessorContext is your handle to the runtime. The most important method is forward(Record), which sends a record to downstream processors. Unlike the DSL — where one operator produces exactly one output stream — a processor can call forward zero, one, or many times per input record, enabling fan-out, flat-mapping, and conditional emission.

When a node has multiple children, route to a specific one with context.forward(record, "child-name"). The context also exposes metadata about the record currently being processed: recordMetadata() returns the source topic, partition, and offset (useful for logging and dead-letter routing).

Calling forward from a punctuator uses the timestamp of the triggering punctuation, not a source record. Set record.withTimestamp(...) explicitly when downtstream time semantics matter.

Scheduling punctuators

Punctuators let a processor run code on a schedule rather than only when records arrive — essential for emitting aggregates periodically, expiring stale state, or flushing buffers. Schedule one in init with context.schedule(interval, type, punctuator), which returns a Cancellable you can stop later.

Punctuation typeClock usedFires whenTypical use
PunctuationType.STREAM_TIMEEvent timestamps in the dataStream time advances past the intervalTime-driven aggregation aligned to event time
PunctuationType.WALL_CLOCK_TIMESystem wall-clockReal elapsed time passes, even with no inputHeartbeats, idle-stream flushing, TTL cleanup
@Override
public void init(ProcessorContext<String, Long> context) {
    this.context = context;
    this.counts = context.getStateStore("counts");
    context.schedule(Duration.ofMinutes(1),
                     PunctuationType.WALL_CLOCK_TIME,
                     this::emitAndReset);
}

private void emitAndReset(long timestamp) {
    try (var iter = counts.all()) {
        while (iter.hasNext()) {
            var entry = iter.next();
            context.forward(new Record<>(entry.key, entry.value, timestamp));
            counts.delete(entry.key);
        }
    }
}

Stream-time punctuation only advances when records flow, so a quiet partition will not fire it. Wall-clock punctuation fires regardless, which is why TTL/cleanup logic should use it.

Direct state store access

The Processor API gives unrestricted, read-write access to any state store the node was wired to. Register the store on the topology (or as a StoreBuilder in the DSL), then fetch it by name in init. You can hold several stores in one processor — something the DSL cannot do in a single operator — and iterate with range() or all() for batch logic inside a punctuator. Remember to close iterators to release resources, as shown above with try-with-resources.

Mixing with the DSL via process()

You rarely build an entire topology by hand. Instead, stay in the DSL for the easy parts and splice in a processor exactly where you need it. KStream.process(...) attaches a ProcessorSupplier (terminal — no return value), while processValues(...) preserves the key and returns a new KStream for further DSL operations. Declare the stores the processor needs as the trailing argument so Streams wires them up.

StoreBuilder<KeyValueStore<String, Long>> store =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("seen-orders"),
        Serdes.String(), Serdes.Long());

builder.addStateStore(store);

builder.stream("orders", Consumed.with(Serdes.String(), orderSerde()))
       .process(DedupeProcessor::new, "seen-orders")  // custom node
       .to("orders-deduped");                          // back in the DSL

This hybrid style keeps your topology readable while letting the Processor API handle the one piece the DSL can’t.

Best Practices

  • Capture ProcessorContext and resolve state stores in init, never in the constructor — the context isn’t available until the task starts.
  • Always wrap store iterators in try-with-resources; leaked iterators pin RocksDB resources and cause memory growth.
  • Use WALL_CLOCK_TIME for TTL/cleanup and idle flushing; use STREAM_TIME for event-time-aligned emission.
  • Prefer processValues over process when you stay key-partitioned and want to continue with DSL operators downstream.
  • Keep processor instances stateless beyond their state stores so each task gets an independent, thread-safe copy from the supplier.
  • Set explicit timestamps with record.withTimestamp(...) when forwarding from punctuators to keep downstream windowing correct.
Last updated June 1, 2026
Was this helpful?