AdminClient API
Shell scripts like kafka-topics.sh are great for one-off tasks, but real operational tooling needs to manage a cluster from code. The AdminClient is Kafka’s official Java API for administrative operations: creating and deleting topics, reading and altering configs, managing ACLs, and inspecting consumer groups. It speaks the same admin protocol the CLI tools use, so anything you can do from bin/ you can do programmatically — inside provisioning jobs, health checks, CI pipelines, or self-service platform APIs. Because every operation is asynchronous and returns a future, AdminClient fits naturally into automation that must create resources on demand and verify the result before proceeding.
Creating an AdminClient
AdminClient is a thread-safe, long-lived object. Build one from a Properties map, reuse it across operations, and close it when done. Only bootstrap.servers is strictly required; in production you also supply security settings.
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import java.util.Properties;
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
try (Admin admin = Admin.create(props)) {
// run admin operations here
}
Every method returns a *Result object whose accessors hand back KafkaFuture instances. Call .get() to block until the broker responds, or chain non-blocking callbacks. Failures surface as an ExecutionException wrapping a Kafka error such as TopicExistsException or UnknownTopicOrPartitionException.
Creating and listing topics
createTopics takes a collection of NewTopic, each describing a name, partition count, and replication factor. Wrapping .get() in a try/catch lets you treat “already exists” as success — a common pattern in idempotent provisioning.
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.TopicExistsException;
import java.util.*;
import java.util.concurrent.ExecutionException;
NewTopic orders = new NewTopic("orders", 3, (short) 1)
.configs(Map.of(
"cleanup.policy", "delete",
"retention.ms", "604800000"));
CreateTopicsResult result = admin.createTopics(List.of(orders));
try {
result.all().get();
System.out.println("Created topic 'orders'");
} catch (ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) {
System.out.println("Topic 'orders' already exists");
} else {
throw e;
}
}
Set<String> topics = admin.listTopics().names().get();
System.out.println("Topics: " + topics);
Output:
Created topic 'orders'
Topics: [orders, __consumer_offsets]
By default listTopics() hides internal topics like __consumer_offsets. Pass a ListTopicsOptions().listInternal(true) to include them.
Describing and deleting topics
describeTopics returns a TopicDescription per topic, exposing each partition’s leader, replicas, and in-sync replica (ISR) set — exactly what you need to verify replication health after a create.
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
Map<String, TopicDescription> described =
admin.describeTopics(List.of("orders")).allTopicNames().get();
TopicDescription desc = described.get("orders");
System.out.println("Partitions: " + desc.partitions().size());
for (TopicPartitionInfo p : desc.partitions()) {
System.out.printf(" p%d leader=%d isr=%d%n",
p.partition(), p.leader().id(), p.isr().size());
}
Output:
Partitions: 3
p0 leader=1 isr=1
p1 leader=1 isr=1
p2 leader=1 isr=1
Deleting is just as direct. deleteTopics marks the topics for deletion; the call returns once the controller accepts it, though physical log removal happens asynchronously.
admin.deleteTopics(List.of("orders")).all().get();
Topic deletion is irreversible and asynchronous. After
deleteTopics().all().get()succeeds, a brief window remains where the topic still appears in metadata. If you immediately recreate the same name, retry onTopicExistsExceptionuntil the old topic fully drains.
Reading and altering configs
Configs are addressed by ConfigResource, which pairs a type (TOPIC, BROKER) with a name. Use describeConfigs to read effective settings and incrementalAlterConfigs — the modern, additive API — to change them. Avoid the deprecated alterConfigs, which overwrites the entire config set and silently drops anything you omit.
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType;
ConfigResource topicRes = new ConfigResource(ConfigResource.Type.TOPIC, "orders");
// Read one config value
Config cfg = admin.describeConfigs(List.of(topicRes)).all().get().get(topicRes);
System.out.println("retention.ms = " + cfg.get("retention.ms").value());
// Change retention without disturbing other settings
AlterConfigOp op = new AlterConfigOp(
new ConfigEntry("retention.ms", "259200000"), OpType.SET);
admin.incrementalAlterConfigs(Map.of(topicRes, List.of(op))).all().get();
OpType | Effect |
|---|---|
SET | Set or overwrite a single config key |
DELETE | Remove an override, reverting to the broker default |
APPEND | Add values to a list-type config (e.g. cleanup.policy) |
SUBTRACT | Remove values from a list-type config |
Inspecting consumer groups
AdminClient is the programmatic equivalent of kafka-consumer-groups.sh. listConsumerGroups enumerates groups, describeConsumerGroups reports state and members, and listConsumerGroupOffsets returns committed offsets per partition — the inputs for a lag calculation.
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
for (ConsumerGroupListing g : admin.listConsumerGroups().all().get()) {
System.out.println("Group: " + g.groupId());
}
ConsumerGroupDescription cg = admin.describeConsumerGroups(List.of("order-processors"))
.all().get().get("order-processors");
System.out.printf("State: %s, members: %d%n", cg.state(), cg.members().size());
Map<TopicPartition, OffsetAndMetadata> offsets =
admin.listConsumerGroupOffsets("order-processors")
.partitionsToOffsetAndMetadata().get();
offsets.forEach((tp, om) ->
System.out.printf(" %s-%d committed=%d%n", tp.topic(), tp.partition(), om.offset()));
Output:
Group: order-processors
State: STABLE, members: 2
orders-0 committed=120
orders-1 committed=98
orders-2 committed=205
To compute lag, pair these committed offsets with the partitions’ end offsets from admin.listOffsets(...) using OffsetSpec.latest(), then subtract.
Best Practices
- Create one
AdminClientper application and reuse it; it pools connections and is fully thread-safe. Always close it in a try-with-resources block. - Treat provisioning as idempotent — catch
TopicExistsExceptionon create andUnknownTopicOrPartitionExceptionon delete so reruns are safe. - Prefer
incrementalAlterConfigsover the deprecatedalterConfigsto avoid accidentally wiping configs you did not explicitly set. - Set
REQUEST_TIMEOUT_MS_CONFIGand add retry handling; admin calls can fail transiently during controller failover. - Never log raw credentials — load security configs (SASL/SSL) from a secrets manager, not hard-coded
Properties. - Restrict who can call destructive operations with ACLs; an unprotected
AdminClientcan delete every topic in the cluster.