Transactions in Depth
Kafka transactions are the machinery behind exactly-once writes: they let a producer commit records to many partitions — and its consumer offsets — as a single atomic unit. From the outside the API is just beginTransaction, send, commitTransaction. Underneath sits a small but precise distributed protocol involving a dedicated broker role, an internal log, and special control records written into your partitions. Understanding that protocol is what lets you reason about latency, failure recovery, and the dreaded “stuck consumer” when a transaction hangs open in production.
The transactional.id and why it is special
A transaction begins with a stable transactional.id set on the producer. Unlike the auto-assigned Producer ID (PID) used by plain idempotence, transactional.id is chosen by you and must survive restarts. It is the identity the cluster uses to recover a producer’s transactional state and, critically, to fence out zombies — older incarnations of the same logical producer that came back from a network partition or a slow GC pause.
When a producer calls initTransactions(), the broker maps its transactional.id to a PID and an epoch. On every reinitialization the epoch is bumped, so a stale producer holding an old epoch is rejected with ProducerFencedException the moment it tries to write or commit.
Properties p = new Properties();
p.put("bootstrap.servers", "broker:9092");
p.put("transactional.id", "payments-processor-7"); // stable across restarts
p.put("enable.idempotence", "true");
p.put("acks", "all");
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
producer.initTransactions(); // registers transactional.id, fetches PID + epoch
The transaction coordinator
Every transactional.id is owned by exactly one transaction coordinator — a broker module, not a separate process. The coordinator is selected by hashing the transactional.id to a partition of the internal __transaction_state topic; whichever broker leads that partition is your coordinator. This is the two-phase-commit manager for your transaction.
The coordinator persists state to that internal topic, the transaction log. Each transaction’s lifecycle (Empty → Ongoing → PrepareCommit/PrepareAbort → CompleteCommit/CompleteAbort) and the full set of partitions touched are durably recorded there. If the coordinator broker dies, a new leader for that __transaction_state partition replays the log and resumes any in-flight transactions — completing or aborting them so no partition is left in limbo.
The protocol, step by step
When you run a read-process-write loop, the producer and coordinator exchange a fixed sequence of requests. The producer’s first write to any new partition triggers an AddPartitionsToTxn so the coordinator knows which partitions must receive markers at commit time.
Producer Coordinator (__transaction_state) Topic partitions
| | |
|-- initTransactions -------->| assign PID, bump epoch |
| | |
|-- beginTransaction (local) -- |
|-- AddPartitionsToTxn ------>| log: Ongoing {p0, p1} |
|-- send(records) -----------------------------------------------> | append data (PID, epoch, seq)
|-- AddOffsetsToTxn --------->| log: include __consumer_offsets |
|-- sendOffsetsToTransaction ------------------------------------> | write offsets
| | |
|-- commitTransaction ------->| log: PrepareCommit |
| |-- WriteTxnMarkers (COMMIT) ------> | append COMMIT marker to each
| | log: CompleteCommit |
|<-- commit OK ---------------| |
The key insight: the producer does not write the markers. The coordinator does, via WriteTxnMarkers, to every partition that participated. Only after all markers are durably written does the coordinator log CompleteCommit and acknowledge the producer.
Commit and abort markers in the log
Data records and control records share the same partition log. A transaction marker (also called a control batch) is a special record the coordinator appends to mark a transaction’s boundary in that partition. It carries the PID, the epoch, and whether the transaction committed or aborted.
partition 0: [data PID=42 seq=0..3] ... [data PID=42 seq=4..7] [CONTROL: COMMIT PID=42]
partition 1: [data PID=42 seq=0..1] ......................... [CONTROL: ABORT PID=42]
Markers are how a consumer later decides whether the preceding data from that PID is valid. They are invisible to your application — the client library filters them out — but they occupy offsets, which is why transactional topics show small offset gaps.
read_committed and the last stable offset
A read_committed consumer must never expose uncommitted or aborted data. It enforces this with the last stable offset (LSO): the highest offset such that all transactions below it are resolved (committed or aborted). The consumer will not return any record at or beyond the LSO, even if those records are physically present, because their fate is still unknown.
| Aspect | read_uncommitted (default) | read_committed |
|---|---|---|
| Sees ongoing/uncommitted records | Yes | No |
| Returns aborted records | Yes | No (filtered via aborted-txn index) |
| Reads up to | High watermark | Last stable offset (LSO) |
| Latency | Lowest | Adds commit-latency to visibility |
To skip aborted records efficiently, the broker maintains an aborted transaction index per segment. On fetch, the broker ships the relevant aborted-PID ranges alongside the data, and the consumer drops any record whose PID is in that set up to the matching abort marker.
Gotcha: a single long-running or hung transaction pins the LSO. Every
read_committedconsumer on that partition stalls behind it — not just the slow producer’s own output. Always settransaction.timeout.msso the coordinator force-aborts abandoned transactions and unblocks readers.
Zombie fencing via epochs
The epoch is the linchpin of correctness across restarts. Suppose payments-processor-7 stalls, a replacement starts and calls initTransactions() (epoch → 6), then the original wakes up still holding epoch 5. Any write, offset commit, or commit attempt from the old instance is rejected:
Old producer --send(PID=42, epoch=5)--> broker
broker --ProducerFencedException--> old producer (epoch 5 < current 6)
This prevents two processes that believe they own the same transactional.id from both writing — the classic zombie problem in consume-transform-produce pipelines. In Spring for Apache Kafka, the framework derives a per-instance transactional.id from the configured prefix and fences automatically; you rarely set epochs by hand.
Best Practices
- Use a stable, unique
transactional.idper logical producer so the coordinator can fence zombies after restarts and recovery is deterministic. - Keep transactions short: a hung transaction pins the LSO and stalls every
read_committedconsumer on the partition, not only your own. - Set
transaction.timeout.msbelow the broker’stransaction.max.timeout.ms, and above your worst-case processing time, so abandoned transactions auto-abort without false aborts. - Batch a sensible number of records per transaction to amortize the marker and two-phase-commit cost — but don’t make them so large that recovery replays huge spans.
- Always pair transactional producers with
isolation.level=read_committedconsumers; otherwise markers and the LSO buy you nothing downstream. - Provision enough partitions for
__transaction_stateand monitor coordinator load — it is on the hot path of every commit. - Treat
ProducerFencedExceptionas fatal for that producer instance: close it and let the surviving instance own the work.