Skip to content
Apache Kafka kf connect 5 min read

Change Data Capture with Debezium

Change Data Capture (CDC) turns a database’s write-ahead log into a stream of events: every insert, update, and delete that hits a table is captured and published to Kafka, in commit order, with almost no load on the source. Debezium is the de-facto open-source CDC platform, shipping as a family of Kafka Connect source connectors for MySQL, PostgreSQL, MongoDB, SQL Server, Oracle, and more. Unlike a JDBC source that polls and re-queries tables, Debezium reads the database’s own transaction log — MySQL’s binlog or Postgres’s WAL — so it sees deletes, never misses a change between polls, and captures the exact before/after state of every row. This page explains how log-based CDC works, the shape of a Debezium change event, snapshotting, and how to configure the MySQL and Postgres connectors.

Why log-based CDC beats polling

A polling source connector issues a SELECT ... WHERE updated_at > ? on a schedule. That approach can’t see hard deletes (the row is simply gone), misses intermediate updates that happen between two polls, and adds query load that grows with table size. Log-based CDC sidesteps all of this by tailing the structure the database already maintains for replication and crash recovery.

AspectJDBC polling sourceDebezium (log-based CDC)
Captures deletesNoYes
Captures every intermediate updateNo (only latest at poll time)Yes (every committed change)
Source loadRepeated SELECT queriesReads the log, near-zero query load
OrderingPer-poll, best-effortStrict commit order
Requires schema columnNeeds a monotonic/timestamp columnNo application schema requirement

Log-based CDC requires database-side configuration: MySQL needs binlog_format=ROW and binlog_row_image=FULL; Postgres needs wal_level=logical. Without these the connector cannot capture full row images and will fail to start.

The Debezium change-event envelope

Every captured change becomes a Kafka record whose value is a structured envelope. The key carries the row’s primary key; the value carries the before state, the after state, an op code, and source metadata describing where the change came from in the log.

{
  "before": null,
  "after": {
    "id": 1001,
    "email": "[email protected]",
    "status": "ACTIVE"
  },
  "source": {
    "db": "shop",
    "table": "customers",
    "ts_ms": 1717200000000,
    "pos": 154892,
    "snapshot": "false"
  },
  "op": "c",
  "ts_ms": 1717200000123
}

The op field is the most important part of the envelope:

opMeaningbeforeafter
cCreate (insert)nullnew row
uUpdateold rownew row
dDeleteold rownull
rRead (snapshot row)nullrow image

A delete also emits a follow-up tombstone record — the same key with a null value — so log-compacted topics can physically remove the deleted key. You can disable tombstones with tombstones.on.delete=false when you don’t compact.

Snapshotting: capturing existing data

When a connector starts for the first time it has no log offset to resume from, and the historical rows that existed before CDC began are not in the log. Debezium solves this with a snapshot: it reads the current contents of the configured tables and emits them as op: "r" (“read”) events, then seamlessly switches to streaming live changes from the log position captured at snapshot start. The snapshot.mode setting controls this behaviour:

snapshot.modeBehaviour
initial (default)Snapshot tables on first start, then stream
initial_onlySnapshot once, then stop
no_data (schema_only)Capture schema only, stream changes from now on
when_neededSnapshot only if no valid offset/log position exists

Configuring a Debezium MySQL connector

You register the connector by POSTing JSON to the Connect REST API. Debezium needs unique topic.prefix (the logical server name) and database.server.id values, plus a Kafka topic to store schema-change history.

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "shop-mysql-cdc",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "tasks.max": "1",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbz-secret",
      "database.server.id": "184054",
      "topic.prefix": "shop",
      "database.include.list": "shop",
      "table.include.list": "shop.customers,shop.orders",
      "snapshot.mode": "initial",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
      "schema.history.internal.kafka.topic": "schema-changes.shop",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false"
    }
  }'

Records for the customers table land on the topic shop.shop.customers (<topic.prefix>.<database>.<table>).

Configuring a Debezium PostgreSQL connector

The Postgres connector reads logical decoding output from the WAL. The modern default plugin is pgoutput, which is built into Postgres and needs no extra extension. It also requires a replication slot that persists the connector’s WAL position.

{
  "name": "shop-postgres-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz-secret",
    "database.dbname": "shop",
    "topic.prefix": "shop",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_shop",
    "publication.autocreate.mode": "filtered",
    "table.include.list": "public.customers,public.orders",
    "snapshot.mode": "initial"
  }
}

The replication slot retains WAL until Debezium consumes it. If the connector is stopped for a long time the WAL accumulates and can fill the disk — monitor pg_replication_slots and drop unused slots, or the database can grind to a halt.

Reading change events

The envelope is verbose. For most downstream consumers you only want the after state, so Debezium ships the ExtractNewRecordState single-message transform that flattens the envelope down to the new row (and re-routes deletes). Add it to the connector config:

transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite

A Spring @KafkaListener can then consume the simplified record as a plain DTO record:

public record CustomerChange(Long id, String email, String status) {}

@KafkaListener(topics = "shop.shop.customers", groupId = "cdc-consumer")
public void onCustomerChange(CustomerChange change) {
    log.info("Customer {} now {}", change.id(), change.status());
}

Output:

Customer 1001 now ACTIVE
Customer 1001 now SUSPENDED

Best Practices

  • Configure the database first: binlog_format=ROW/binlog_row_image=FULL for MySQL, wal_level=logical for Postgres, before registering the connector.
  • Give the CDC user least-privilege replication rights (REPLICATION SLAVE, REPLICATION CLIENT on MySQL; REPLICATION role on Postgres) rather than full admin.
  • Use table.include.list to capture only the tables you need — narrower scope means smaller snapshots and less log traffic.
  • Monitor Postgres replication-slot lag and disk usage; a stalled connector silently grows the WAL until the disk fills.
  • Run one task per connector (tasks.max=1) — log readers are single-threaded by design; scale by adding connectors, not tasks.
  • Use the ExtractNewRecordState SMT to flatten the envelope for consumers that don’t need before/after history, but keep the full envelope when you need auditability.
  • Store schema-change history (MySQL) on a dedicated, replicated internal topic and never delete it, or the connector loses its ability to interpret older log positions.
Last updated June 1, 2026
Was this helpful?