Skip to content
Apache Kafka projects 4 min read

Project: CDC to Search Index

Keeping a search index in sync with a relational database is a classic source of bugs: dual writes drift, batch reindex jobs run hours behind, and dropped updates leave stale results. Change data capture (CDC) solves this by reading the database’s transaction log and emitting every insert, update, and delete as an event. In this project you will wire a Debezium source connector to capture Postgres changes into Kafka, then an Elasticsearch sink connector to index them in near real time — all running on Kafka Connect with zero application code on the write path.

Architecture overview

Debezium tails the Postgres write-ahead log (WAL) through a logical replication slot, so it sees committed row changes in order without polling tables. Each change becomes a Kafka record on a per-table topic. The Elasticsearch sink connector consumes those topics and applies the corresponding index operations, turning row-level CRUD into document upserts and deletes.

[Postgres WAL] --> [Debezium source connector] --> Kafka topics (one per table)
                                                          |
                                                          v
                                       [Elasticsearch sink connector]
                                                          |
                                                          v
                                              Elasticsearch index (search)

Preparing Postgres

Logical decoding must be enabled for Debezium to read the WAL. Set the replication level and restart the database:

wal_level = logical
max_wal_senders = 4
max_replication_slots = 4

Create a table and a user with replication rights:

CREATE TABLE products (
    id          BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    name        TEXT NOT NULL,
    description TEXT,
    price       NUMERIC(10,2),
    updated_at  TIMESTAMPTZ DEFAULT now()
);

CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'dbz';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

Configuring the Debezium source connector

Connectors are registered against the Kafka Connect REST API as JSON. The source connector points at Postgres, names the replication slot and publication, and routes captured tables into Kafka. The pgoutput plugin ships with modern Postgres, so no external decoder is needed.

{
  "name": "products-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.dbname": "shop",
    "topic.prefix": "shop",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_products",
    "publication.autocreate.mode": "filtered",
    "table.include.list": "public.products",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}

Register it:

curl -s -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @products-source.json

Debezium first runs a consistent snapshot of existing rows, then switches to streaming WAL changes. Captured records land on the topic shop.public.products.

Reshaping events with a transform

A raw Debezium event is an envelope with before, after, op, and source fields. Elasticsearch wants a flat document. The ExtractNewRecordState single-message transform (SMT) unwraps the envelope, leaving just the row’s new state, and propagates deletes as tombstones so the sink can remove documents. Add it to the source config:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "false"

Gotcha: the document _id in Elasticsearch must come from the row’s primary key, otherwise updates create duplicate documents instead of replacing them. The sink connector below derives the id from the Kafka record key, which Debezium sets to the table’s primary key — so never disable keys or repartition these topics.

Configuring the Elasticsearch sink connector

The sink consumes the table topic and writes upserts. Setting key.ignore to false makes the connector use the record key as the document id, and behavior.on.null.values: delete turns tombstones into delete operations.

{
  "name": "products-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "shop.public.products",
    "connection.url": "http://elasticsearch:9200",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}
OptionPurpose
key.ignoreWhen false, uses the Kafka key as the ES _id, enabling true upserts
schema.ignoreLets ES infer field mappings instead of requiring a Connect schema
behavior.on.null.valuesdelete removes the doc on a tombstone, keeping the index in sync
write.methodupsert merges partial updates rather than overwriting

Verifying the pipeline

Insert a row in Postgres, then confirm it appears in Elasticsearch within a second:

INSERT INTO products (name, description, price)
VALUES ('Mechanical Keyboard', 'Hot-swappable, RGB', 129.99);
curl -s http://localhost:9200/shop.public.products/_search?q=keyboard

Output:

{
  "hits": {
    "total": { "value": 1 },
    "hits": [
      {
        "_index": "shop.public.products",
        "_id": "1",
        "_source": {
          "id": 1,
          "name": "Mechanical Keyboard",
          "description": "Hot-swappable, RGB",
          "price": 129.99
        }
      }
    ]
  }
}

Updating the price re-indexes the same _id, and a DELETE in Postgres removes the document. Check connector health at any time:

curl -s http://localhost:8083/connectors/products-sink/status

Best Practices

  • Run one replication slot per source connector and monitor its confirmed_flush_lsn; an abandoned slot pins WAL and can fill the disk.
  • Always use ExtractNewRecordState for sinks that expect flat documents, and keep tombstones enabled so deletes propagate.
  • Derive the Elasticsearch _id from the primary key (key.ignore: false) to guarantee idempotent upserts and correct deletes.
  • Size connector tasks to the topic partition count; Connect cannot run more tasks than there are partitions to consume.
  • Pin Debezium and connector plugin versions and test schema changes in staging, since DDL can alter event structure.
  • Add a dead-letter queue (errors.tolerance: all with errors.deadletterqueue.topic.name) so a single bad record never stalls the sink.
  • Monitor end-to-end lag from WAL position to index refresh; growing lag signals the sink can’t keep up with write volume.
Last updated June 1, 2026
Was this helpful?