Sink Connectors
Sink connectors are the half of Kafka Connect that moves data out of Kafka and into external systems — relational databases, search indexes, object stores, data warehouses, and more. Instead of writing and operating a bespoke consumer application for every downstream system, you declare a connector with a JSON config and let the Connect runtime handle polling, batching, offset commits, retries, and scaling. In production this is how Kafka topics get materialized into queryable, durable stores without a fleet of hand-rolled consumers to babysit.
How a sink connector works
A sink connector consumes records from one or more topics using the standard consumer protocol, then writes them in batches to the target system. The Connect framework assigns topic partitions to tasks (each task is essentially a managed consumer), and the connector’s put(Collection<SinkRecord>) method receives batches to write. Once a batch is durably persisted downstream, Connect commits the corresponding consumer offsets, so progress is tracked in Kafka’s __consumer_offsets topic rather than in the external system.
Because offset management is decoupled from the write, the runtime can rebalance partitions across tasks, recover after a crash, and resume exactly where it left off. Your job is to choose the right connector, point it at topics, and tune batching and error handling.
Configuring a sink via the REST API
Connectors are created by POSTing JSON to the Connect REST API. Here is a JDBC sink that upserts records into a Postgres table:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jdbc-sink.json
{
"name": "orders-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"topics": "orders",
"connection.url": "jdbc:postgresql://db:5432/sales",
"connection.user": "connect",
"connection.password": "${file:/secrets/db.properties:password}",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "order_id",
"auto.create": "true",
"auto.evolve": "true",
"batch.size": "3000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Output:
HTTP/1.1 201 Created
{"name":"orders-jdbc-sink","config":{...},"tasks":[],"type":"sink"}
Never embed plaintext credentials in connector configs. Use a
ConfigProvidersuch as theFileConfigProvidershown above (${file:...}) or an external secrets store so the password is resolved at runtime, not stored in Connect’s config topic.
Common sink connectors
| Connector | Target | Key configs | Typical use |
|---|---|---|---|
| JDBC Sink | RDBMS (Postgres, MySQL, etc.) | insert.mode, pk.mode, auto.evolve | Materialize topics into tables |
| Elasticsearch Sink | Elasticsearch / OpenSearch | key.ignore, behavior.on.null.values, write.method | Full-text search, log analytics |
| S3 Sink | Amazon S3 (or compatible) | flush.size, partitioner.class, format.class | Long-term archival, data lake |
An Elasticsearch sink snippet illustrates how each connector adds its own keys on top of the shared ones:
{
"name": "logs-es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "2",
"topics": "app-logs",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"batch.size": "2000",
"max.in.flight.requests": "5"
}
}
Key configuration concepts
Topic selection. Use topics for an explicit comma-separated list, or topics.regex to subscribe by pattern (e.g. orders\\..*). Exactly one of the two must be set.
Batching. Sinks buffer records before flushing. Connect’s consumer.max.poll.records caps how many records a task fetches at once, while connector-specific keys like JDBC’s batch.size or S3’s flush.size control how many records are written per downstream call. Larger batches improve throughput at the cost of latency and memory.
Converters. The key.converter and value.converter deserialize bytes from Kafka into Connect’s internal data model before the connector writes them. They must match what the producer used. See the converters page for the full picture.
Error handling and dead-letter queues
By default a single bad record (deserialization failure, schema mismatch, downstream rejection) stops the task. For resilient pipelines, configure error tolerance so poison messages are routed to a dead-letter queue (DLQ) topic instead:
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "orders-sink-dlq",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.retry.timeout": "60000",
"errors.retry.delay.max.ms": "5000",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
With errors.tolerance: all, records that fail conversion or transformation are written to the DLQ topic (with the original failure recorded in message headers), and the task keeps making progress. With the default none, any failure fails the task. Note that DLQ handling covers converter and transform errors; failures inside the connector’s own write logic are governed by errors.retry.* and the connector’s own behavior settings.
Delivery semantics
Sink connectors provide at-least-once delivery by default: offsets are committed only after a batch is successfully written, so a crash between write and commit causes the last batch to be replayed. To make this safe, design downstream writes to be idempotent — for example, JDBC insert.mode=upsert keyed on a primary key, or Elasticsearch document IDs derived from the record key so a replay overwrites rather than duplicates.
Some connectors offer exactly-once semantics (for instance, the S3 sink achieves it through deterministic, offset-based file naming so re-delivered records land in the same object), but most rely on idempotent writes to make at-least-once effectively exactly-once.
Treat duplicates as inevitable. If your sink cannot dedupe via keys or upserts, you will eventually see double-writes after a rebalance or restart.
Best Practices
- Make downstream writes idempotent (upserts, deterministic keys/IDs) so at-least-once replays are harmless.
- Always configure a dead-letter queue with
errors.tolerance: allfor production pipelines so one bad record cannot halt the connector. - Set
tasks.maxno higher than the partition count of the consumed topics — extra tasks sit idle. - Tune
batch.size/flush.sizeandconsumer.max.poll.recordstogether to balance throughput, latency, and memory. - Resolve secrets through a
ConfigProviderinstead of hard-coding credentials in the connector JSON. - Match converters to the producer’s serialization format, and pair Avro/Protobuf with Schema Registry to safely evolve schemas.
- Monitor connector and task state via the REST API (
GET /connectors/<name>/status) and alert onFAILEDtasks and DLQ topic growth.