Project: CDC to Search Index
Keeping a search index in sync with a relational database is a classic source of bugs: dual writes drift, batch reindex jobs run hours behind, and dropped updates leave stale results. Change data capture (CDC) solves this by reading the database’s transaction log and emitting every insert, update, and delete as an event. In this project you will wire a Debezium source connector to capture Postgres changes into Kafka, then an Elasticsearch sink connector to index them in near real time — all running on Kafka Connect with zero application code on the write path.
Architecture overview
Debezium tails the Postgres write-ahead log (WAL) through a logical replication slot, so it sees committed row changes in order without polling tables. Each change becomes a Kafka record on a per-table topic. The Elasticsearch sink connector consumes those topics and applies the corresponding index operations, turning row-level CRUD into document upserts and deletes.
[Postgres WAL] --> [Debezium source connector] --> Kafka topics (one per table)
|
v
[Elasticsearch sink connector]
|
v
Elasticsearch index (search)
Preparing Postgres
Logical decoding must be enabled for Debezium to read the WAL. Set the replication level and restart the database:
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
Create a table and a user with replication rights:
CREATE TABLE products (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
price NUMERIC(10,2),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'dbz';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
Configuring the Debezium source connector
Connectors are registered against the Kafka Connect REST API as JSON. The source connector points at Postgres, names the replication slot and publication, and routes captured tables into Kafka. The pgoutput plugin ships with modern Postgres, so no external decoder is needed.
{
"name": "products-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "shop",
"topic.prefix": "shop",
"plugin.name": "pgoutput",
"slot.name": "debezium_products",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.products",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
Register it:
curl -s -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @products-source.json
Debezium first runs a consistent snapshot of existing rows, then switches to streaming WAL changes. Captured records land on the topic shop.public.products.
Reshaping events with a transform
A raw Debezium event is an envelope with before, after, op, and source fields. Elasticsearch wants a flat document. The ExtractNewRecordState single-message transform (SMT) unwraps the envelope, leaving just the row’s new state, and propagates deletes as tombstones so the sink can remove documents. Add it to the source config:
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "false"
Gotcha: the document
_idin Elasticsearch must come from the row’s primary key, otherwise updates create duplicate documents instead of replacing them. The sink connector below derives the id from the Kafka record key, which Debezium sets to the table’s primary key — so never disable keys or repartition these topics.
Configuring the Elasticsearch sink connector
The sink consumes the table topic and writes upserts. Setting key.ignore to false makes the connector use the record key as the document id, and behavior.on.null.values: delete turns tombstones into delete operations.
{
"name": "products-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "shop.public.products",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"write.method": "upsert",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
| Option | Purpose |
|---|---|
key.ignore | When false, uses the Kafka key as the ES _id, enabling true upserts |
schema.ignore | Lets ES infer field mappings instead of requiring a Connect schema |
behavior.on.null.values | delete removes the doc on a tombstone, keeping the index in sync |
write.method | upsert merges partial updates rather than overwriting |
Verifying the pipeline
Insert a row in Postgres, then confirm it appears in Elasticsearch within a second:
INSERT INTO products (name, description, price)
VALUES ('Mechanical Keyboard', 'Hot-swappable, RGB', 129.99);
curl -s http://localhost:9200/shop.public.products/_search?q=keyboard
Output:
{
"hits": {
"total": { "value": 1 },
"hits": [
{
"_index": "shop.public.products",
"_id": "1",
"_source": {
"id": 1,
"name": "Mechanical Keyboard",
"description": "Hot-swappable, RGB",
"price": 129.99
}
}
]
}
}
Updating the price re-indexes the same _id, and a DELETE in Postgres removes the document. Check connector health at any time:
curl -s http://localhost:8083/connectors/products-sink/status
Best Practices
- Run one replication slot per source connector and monitor its
confirmed_flush_lsn; an abandoned slot pins WAL and can fill the disk. - Always use
ExtractNewRecordStatefor sinks that expect flat documents, and keep tombstones enabled so deletes propagate. - Derive the Elasticsearch
_idfrom the primary key (key.ignore: false) to guarantee idempotent upserts and correct deletes. - Size connector tasks to the topic partition count; Connect cannot run more tasks than there are partitions to consume.
- Pin Debezium and connector plugin versions and test schema changes in staging, since DDL can alter event structure.
- Add a dead-letter queue (
errors.tolerance: allwitherrors.deadletterqueue.topic.name) so a single bad record never stalls the sink. - Monitor end-to-end lag from WAL position to index refresh; growing lag signals the sink can’t keep up with write volume.