Skip to content
Apache Kafka kf connect 5 min read

Source Connectors

A source connector is the half of Kafka Connect that flows data into Kafka: it reads records from an external system — a relational database, a log file, an HTTP API — and produces them onto topics, while Connect tracks how far it has read so it can resume after a restart. In production this is how most pipelines begin, turning existing systems of record into a continuous stream of events without anyone hand-writing a producer. This page shows how to configure a JDBC and a FileStream source via the REST API, how polling and incrementing modes decide what gets pulled, how records map onto topics, and how to confirm the data actually landed.

How a source connector works

A source connector acts as a Kafka producer that you configure declaratively instead of code. Connect splits the connector into one or more tasks (tasks.max), each of which repeatedly polls the external system, converts each result into a Kafka record, and hands it to the runtime to be produced. Crucially, Connect also persists a source offset alongside every batch — an opaque marker that means “this is the last position I read” (a database column value, a file byte offset, a log sequence number). That offset lives in the internal connect-offsets topic, so a crashed or rescheduled task picks up exactly where it stopped rather than re-reading everything.

 ┌──────────────┐    poll     ┌──────────────────┐   produce   ┌────────┐
 │  PostgreSQL  │────────────▶│  source task     │────────────▶│ topics │
 │   (tables)   │             │  + offset store  │             └────────┘
 └──────────────┘◀────────────└──────────────────┘
        resume from last source offset

Configuring a JDBC source connector

The Confluent JDBC source connector reads rows from one or more tables and emits a record per row. You submit it as JSON to the Connect REST API. The example below ingests every table in the shop database whose name starts with the public. prefix and writes each to its own topic.

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "shop-jdbc-source",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "tasks.max": "2",
      "connection.url": "jdbc:postgresql://db:5432/shop",
      "connection.user": "connect",
      "connection.password": "secret",
      "mode": "timestamp+incrementing",
      "incrementing.column.name": "id",
      "timestamp.column.name": "updated_at",
      "table.whitelist": "orders,customers",
      "poll.interval.ms": "5000",
      "topic.prefix": "shop-",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false"
    }
  }'

Polling and incrementing modes

The mode setting is the heart of a JDBC source: it tells the connector how to decide which rows are new since the last poll. Choosing the wrong mode is the most common cause of missing or duplicated data.

ModeWhat it tracksCatches insertsCatches updatesNotes
bulknothingre-reads all rows every polln/aSnapshots the whole table each interval; use for small, static lookup tables.
incrementinga strictly increasing columnyesnoNeeds a monotonic key like an auto-increment id.
timestampa last-modified columnyesyesRelies on updated_at being set on every change.
timestamp+incrementingboth columnsyesyesMost robust; the id breaks ties when rows share a timestamp.

Every poll.interval.ms the task runs a query bounded by the stored offset, e.g. WHERE updated_at > ? OR (updated_at = ? AND id > ?), then advances the offset to the maximum it saw. None of these modes detect deletes — for delete-aware capture you want log-based change-data-capture instead (see Debezium).

Do not use bulk mode against a large table on a short interval. It issues a full table scan every poll and re-emits every row, hammering your database and flooding the topic with duplicates. Reserve bulk for small reference data and use a timestamp/incrementing mode for anything that grows.

Topic mapping

The JDBC source builds each destination topic name as topic.prefix + table name. With topic.prefix: "shop-" and the orders and customers tables, records land in shop-orders and shop-customers. Connect does not auto-create topics unless the broker has auto.create.topics.enable=true or you enable Connect’s own topic creation — in production you should create the topics ahead of time with the partition count and retention you want. By default the record key is null; configuring key.converter plus a primary-key column (or an SMT such as ValueToKey) gives you a meaningful key so log compaction and partitioning behave sensibly.

Configuring a FileStream source connector

For quick demos and local testing, the built-in FileStreamSourceConnector tails a text file line by line and produces each line as a record to one topic. It ships with Kafka itself, so no plugin install is needed.

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "file-source",
    "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
      "tasks.max": "1",
      "file": "/data/access.log",
      "topic": "file-lines",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
  }'

The FileStream source is single-task by design (tasks.max above 1 is ignored) and stores the byte offset it has read so it resumes mid-file after a restart. It is excellent for learning Connect and verifying a cluster end-to-end, but it is not meant for production log shipping — use a purpose-built connector for that.

Verifying data lands in the topic

After submitting either connector, confirm it is healthy and then read the topic. First check the connector and its tasks are RUNNING:

curl -s http://localhost:8083/connectors/file-source/status

Output:

{"name":"file-source","connector":{"state":"RUNNING","worker_id":"10.0.0.4:8083"},
 "tasks":[{"id":0,"state":"RUNNING","worker_id":"10.0.0.4:8083"}],"type":"source"}

Then append a line to the file and consume from the topic to see it appear:

echo "GET /health 200" >> /data/access.log
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic file-lines --from-beginning --timeout-ms 5000

Output:

GET /health 200
Processed a total of 1 messages

If the topic stays empty, inspect the task error with GET /connectors/<name>/status — a FAILED task includes the stack trace, which is usually a bad connection URL, a missing driver JAR in plugin.path, or a misnamed offset column.

Best Practices

  • Pick timestamp+incrementing for JDBC sources so you capture both inserts and updates and survive ties on the timestamp column.
  • Add a database index on the incrementing.column.name and timestamp.column.name so the bounded poll query stays fast as the table grows.
  • Set tasks.max to the number of tables (one task can own several tables, but never more tasks than tables — extras sit idle).
  • Pre-create destination topics with the right partitions and retention instead of relying on broker auto-creation.
  • Give records a real key (a primary-key column or a ValueToKey SMT) so partitioning and compaction work as intended.
  • Reach for log-based CDC (Debezium) when you need deletes, exact ordering, or low-latency capture — query-based JDBC polling cannot see deleted rows.
  • Keep FileStream and bulk mode for demos and small reference data only; never point them at high-volume production sources.
Last updated June 1, 2026
Was this helpful?