Skip to content
Apache Kafka kf producers 4 min read

Producer Transactions

The idempotent producer protects you against duplicate writes caused by retries, but it only guarantees exactly-once delivery to a single partition within a single producer session. Real pipelines often need more: writing to several topics atomically, or consuming a record, processing it, and producing a result as one indivisible unit. Kafka transactions extend the idempotent producer to give you atomic, all-or-nothing writes across multiple partitions and topics — the foundation of the exactly-once read-process-write pattern that powers stream processing.

What a transaction guarantees

A Kafka transaction lets a producer group multiple send calls (across any number of partitions and topics) into a single atomic commit. Either every record in the transaction becomes visible to read_committed consumers, or none of them do. If the producer crashes mid-transaction, the broker aborts it and downstream consumers never see the partial output.

Transactions build directly on idempotence, so enabling them turns on idempotence automatically and forces acks=all. You get ordering and deduplication plus atomicity.

The transactional.id

The key to transactions is the transactional.id config. It is a stable, unique identifier that ties a producer to its transactional state on the broker. When a producer with a given transactional.id restarts and calls initTransactions(), the broker fences off any older producer instance using the same ID and rolls back its in-flight transaction. This zombie fencing is what makes exactly-once safe across crashes.

PropertyValue / meaning
transactional.idStable unique string per logical producer; enables transactions and fencing
enable.idempotenceForced to true when transactional.id is set
acksForced to all
transaction.timeout.msMax time a transaction may stay open before the broker aborts it (default 60000)
isolation.level (consumer)Set to read_committed to hide aborted/in-flight records

The transactional.id must be stable across restarts but unique per producer instance. Two live producers sharing one transactional.id will fence each other in a loop. For partitioned read-process-write apps, derive it deterministically from the input partition assignment.

The transaction lifecycle

A transactional producer follows a strict sequence of API calls:

  • initTransactions() — called once after construction. Registers the transactional.id, fences zombies, and recovers state.
  • beginTransaction() — starts a new transaction.
  • send(...) — one or more sends enrolled in the current transaction.
  • sendOffsetsToTransaction(...) — (read-process-write only) commit consumer offsets as part of the transaction.
  • commitTransaction() — atomically commits all sends and offsets.
  • abortTransaction() — discards everything in the transaction on error.

Atomic writes across partitions

The simplest use is writing to multiple topics atomically — for example, an order and its audit event must both land or neither must.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-writer-1");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    producer.initTransactions();
    try {
        producer.beginTransaction();
        producer.send(new ProducerRecord<>("orders", "order-42", "{\"id\":42}"));
        producer.send(new ProducerRecord<>("audit", "order-42", "created"));
        producer.commitTransaction();
        System.out.println("Committed both records atomically");
    } catch (KafkaException e) {
        producer.abortTransaction();
        System.out.println("Aborted: " + e.getMessage());
    }
}

Output:

Committed both records atomically

Read-process-write for exactly-once

The flagship use case is consuming, transforming, and producing in one atomic step. The trick is sendOffsetsToTransaction: instead of committing consumer offsets separately, you fold them into the producer transaction. The output records and the input offsets commit together, so a record is never reprocessed and its output is never duplicated.

Properties cProps = new Properties();
cProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
cProps.put(ConsumerConfig.GROUP_ID_CONFIG, "etl-group");
cProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);          // offsets committed via the transaction
cProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");  // never read aborted records

Properties pProps = new Properties();
pProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
pProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "etl-processor-1");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cProps);
     KafkaProducer<String, String> producer = new KafkaProducer<>(pProps)) {

    consumer.subscribe(List.of("input"));
    producer.initTransactions();

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        if (records.isEmpty()) continue;

        producer.beginTransaction();
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (ConsumerRecord<String, String> rec : records) {
                producer.send(new ProducerRecord<>("output", rec.key(), rec.value().toUpperCase()));
                offsets.put(
                    new TopicPartition(rec.topic(), rec.partition()),
                    new OffsetAndMetadata(rec.offset() + 1));
            }
            // Commit input offsets as part of the same transaction
            producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
            producer.commitTransaction();
        } catch (KafkaException e) {
            producer.abortTransaction();   // offsets are NOT advanced; records are reprocessed
        }
    }
}

Always pass consumer.groupMetadata() (not just the group ID) so the broker can enforce fencing against rebalances. Offsets advance only on commit, so an abort safely replays the same input batch.

Why consumers need read_committed

By default consumers use isolation.level=read_uncommitted and will see records from open and aborted transactions. To get exactly-once semantics end to end, downstream consumers must set read_committed. Those consumers read only up to the Last Stable Offset (LSO) — the offset before the earliest open transaction — so a long-running transaction can delay visibility for everyone reading that partition.

Best Practices

  • Assign each logical producer a stable, unique transactional.id; never share one across concurrent instances.
  • Set downstream consumers to isolation.level=read_committed or you lose the atomicity guarantee on the read side.
  • Keep transactions short — long transactions stall read_committed consumers at the LSO and risk hitting transaction.timeout.ms.
  • For read-process-write, always use sendOffsetsToTransaction with consumer.groupMetadata() and disable auto-commit.
  • Treat ProducerFencedException, OutOfOrderSequenceException, and AuthorizationException as fatal — close the producer and exit; abort only on retriable KafkaExceptions.
  • Prefer Spring Kafka’s KafkaTransactionManager / @Transactional or Kafka Streams (processing.guarantee=exactly_once_v2) over hand-rolled loops when possible.
Last updated June 1, 2026
Was this helpful?