Connect REST API
Kafka Connect runs as a long-lived service, and every worker exposes an HTTP REST API on port 8083 by default. This API is the primary control plane for Connect: you create, reconfigure, pause, restart, and delete connectors through it, and you query it to find out whether your pipeline is healthy. In distributed mode the API is cluster-aware — a request sent to any worker is forwarded to the leader and applied across the whole group — which makes it the right surface for automation, CI/CD, and runtime operations.
Connector lifecycle endpoints
The core resources are connectors and their tasks. A connector owns configuration; the worker turns that configuration into one or more tasks that do the actual copying. The table below lists the endpoints you will reach for most often.
| Method & path | Purpose |
|---|---|
GET /connectors | List the names of all running connectors |
POST /connectors | Create a connector from a JSON body |
GET /connectors/{name} | Fetch a connector’s full configuration |
GET /connectors/{name}/config | Fetch just the config map |
PUT /connectors/{name}/config | Create or update (upsert) the config |
GET /connectors/{name}/status | Report connector and task health |
PUT /connectors/{name}/pause | Pause the connector and its tasks |
PUT /connectors/{name}/resume | Resume a paused connector |
POST /connectors/{name}/restart | Restart the connector and/or its tasks |
DELETE /connectors/{name} | Remove the connector and stop its tasks |
GET /connector-plugins | List installed connector plugins |
Creating a connector
To create a connector, POST a JSON object with a name and a nested config block to /connectors. The config keys are connector-specific, but connector.class and tasks.max are always required.
curl -s -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "jdbc-orders-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "2",
"connection.url": "jdbc:postgresql://db:5432/shop",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "shop-"
}
}'
Output:
{
"name": "jdbc-orders-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "2",
"connection.url": "jdbc:postgresql://db:5432/shop",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "shop-",
"name": "jdbc-orders-source"
},
"tasks": [],
"type": "source"
}
The tasks array is empty in the immediate response because tasks are assigned asynchronously. Poll the status endpoint to confirm they start.
Updating configuration idempotently
POST /connectors fails with HTTP 409 if the connector already exists. For automation and GitOps, prefer PUT /connectors/{name}/config, which is idempotent: it creates the connector if absent and updates it otherwise. Note that the body for PUT .../config is the flat config map only — there is no outer name/config wrapper.
curl -s -X PUT http://localhost:8083/connectors/jdbc-orders-source/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "4",
"connection.url": "jdbc:postgresql://db:5432/shop",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "shop-"
}'
Always send the complete desired config to
PUT .../config. Connect replaces the configuration wholesale rather than merging, so any key you omit is removed.
Inspecting status
GET /connectors/{name}/status is the endpoint you will wire into health checks and dashboards. It returns the connector state plus the state of every task and the worker each task runs on.
curl -s http://localhost:8083/connectors/jdbc-orders-source/status
Output:
{
"name": "jdbc-orders-source",
"connector": { "state": "RUNNING", "worker_id": "10.0.0.4:8083" },
"tasks": [
{ "id": 0, "state": "RUNNING", "worker_id": "10.0.0.4:8083" },
{ "id": 1, "state": "FAILED", "worker_id": "10.0.0.5:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: ..." }
],
"type": "source"
}
A connector can be RUNNING while individual tasks are FAILED — the top-level state does not aggregate task failures, so always check the tasks array. The full state set is RUNNING, PAUSED, FAILED, UNASSIGNED, STOPPED, and RESTARTING.
Pause, resume, and restart
Pausing keeps the configuration but stops the tasks from processing — useful during a downstream maintenance window. Resuming brings them back. Restarting is the right tool when a task has FAILED with a transient error.
# Pause processing without losing config or offsets
curl -s -X PUT http://localhost:8083/connectors/jdbc-orders-source/pause
# Resume later
curl -s -X PUT http://localhost:8083/connectors/jdbc-orders-source/resume
# Restart the connector AND all failed tasks (Connect 2.3+)
curl -s -X POST \
"http://localhost:8083/connectors/jdbc-orders-source/restart?includeTasks=true&onlyFailed=true"
The includeTasks and onlyFailed query parameters let you restart just the failed tasks instead of bouncing the entire connector. To restart a single task explicitly, call POST /connectors/{name}/tasks/{taskId}/restart.
Deleting connectors and listing plugins
Deleting stops all tasks and removes the connector’s config from the cluster. Offsets stored in the __consumer_offsets topic (for sinks) or the source offset topic remain, so recreating a connector with the same name resumes where it left off.
curl -s -X DELETE http://localhost:8083/connectors/jdbc-orders-source
# Discover which connector classes this worker can run
curl -s http://localhost:8083/connector-plugins
Output:
[
{ "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source", "version": "10.7.4" },
{ "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink", "version": "3.7.0" }
]
You can also validate a configuration before deploying it with PUT /connector-plugins/{class}/config/validate, which returns per-field errors without creating anything.
Best Practices
- Use
PUT /connectors/{name}/configrather thanPOST /connectorsso deployments are idempotent and safe to re-run from CI/CD. - Treat connector JSON as version-controlled artifacts and apply them through automation, not ad-hoc
curlfrom a laptop. - Poll
GET /connectors/{name}/statusin monitoring and alert on any task inFAILEDstate, since the connector-level state can stayRUNNING. - Restart with
?onlyFailed=trueto recover transient failures without disrupting healthy tasks. - Secure the REST endpoint in production: enable TLS, require authentication via a REST extension, and never expose port
8083to untrusted networks — anyone who can reach it can read configs and create connectors. - Validate configs with the
/config/validateendpoint in a pre-deploy step to catch bad properties before they hit the cluster.