Skip to content
Apache Kafka kf connect 4 min read

Converters & Serialization

Kafka stores everything as opaque byte arrays, but Kafka Connect works with a richer, structured representation of records. Converters are the pluggable bridge between those two worlds: they serialize Connect’s internal data model into bytes on the way out and deserialize bytes back into that model on the way in. Choosing and aligning converters correctly is one of the most common sources of production Connect failures, so it pays to understand exactly what they do.

The role of a converter

Every Connect worker runs source connectors (Kafka is the sink) and sink connectors (Kafka is the source). Internally, Connect never touches raw bytes directly. Instead, a connector produces or consumes SourceRecord/SinkRecord objects whose keys and values are expressed in Connect’s data model. The converter sits at the edge:

  • For a source connector: connector → Struct/value → converter serializes → bytes → Kafka topic.
  • For a sink connector: Kafka topic → bytes → converter deserializesStruct/value → connector.

You configure converters with two keys that apply per worker (as defaults) or per connector (as overrides):

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

The key and value are handled by separate converters, so it is perfectly valid to use StringConverter for keys and AvroConverter for values.

The Connect data model: Schema and Struct

Connect defines a runtime-neutral type system in the org.apache.kafka.connect.data package. The two central abstractions are Schema (the shape and types of the data) and Struct (a concrete record conforming to a schema). This lets connectors describe data once and let any converter decide how to wire it.

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

Schema orderSchema = SchemaBuilder.struct().name("Order")
        .field("id", Schema.INT64_SCHEMA)
        .field("customer", Schema.STRING_SCHEMA)
        .field("amount", Schema.OPTIONAL_FLOAT64_SCHEMA)
        .build();

Struct order = new Struct(orderSchema)
        .put("id", 1001L)
        .put("customer", "acme")
        .put("amount", 249.99);

A converter takes this Struct plus its Schema and turns it into bytes; on the consuming side it reconstructs them. Whether the schema actually travels with the data depends on which converter you pick.

Built-in and common converters

ConverterClassSchema carried?Typical use
Stringorg.apache.kafka.connect.storage.StringConverterNoKeys, plain text values
JSONorg.apache.kafka.connect.json.JsonConverterOptional (envelope)Schema-aware JSON, no registry
ByteArrayorg.apache.kafka.connect.converters.ByteArrayConverterNoPass-through binary
Avroio.confluent.connect.avro.AvroConverterYes (Schema Registry)Compact, evolvable production data
Protobufio.confluent.connect.protobuf.ProtobufConverterYes (Schema Registry)gRPC-style schemas
JSON Schemaio.confluent.connect.json.JsonSchemaConverterYes (Schema Registry)Validated JSON

StringConverter, JsonConverter, and ByteArrayConverter ship with Apache Kafka. The Avro, Protobuf, and JSON Schema converters come from Confluent and require Schema Registry on the classpath.

JsonConverter with and without schemas

JsonConverter has a critical toggle, schemas.enable. When true, every message is wrapped in an envelope containing both schema and payload, making the data self-describing but verbose:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Output:

{
  "schema": { "type": "struct", "fields": [
    { "field": "id", "type": "int64" },
    { "field": "customer", "type": "string" }
  ] },
  "payload": { "id": 1001, "customer": "acme" }
}

Setting schemas.enable=false emits plain JSON ({"id":1001,"customer":"acme"}) but discards type information, which downstream sinks may need.

Gotcha: A sink connector reading plain JSON written with schemas.enable=false cannot reconstruct a Schema. Sinks that need typed structures (e.g. JDBC sink building DDL) will fail or treat everything as a schemaless map. Keep schemas enabled when a sink relies on them.

AvroConverter with Schema Registry

Avro is the production default for high-volume pipelines. Instead of embedding the schema in every message, the converter registers it once in Schema Registry and stores only a compact schema ID in the record bytes. This yields small payloads plus enforced schema evolution.

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
key.converter=org.apache.kafka.connect.storage.StringConverter

For a source connector, the Avro schema is derived from the connector’s Connect Schema. For a sink, the converter looks up the schema ID, fetches the writer schema, and rebuilds the Struct.

Matching converters across source and sink

The single most important rule: the converter that reads a topic must understand how the data was written. Bytes carry no inherent format, so a mismatch produces deserialization errors, not graceful degradation.

[Source: AvroConverter] --> topic "orders" (Avro + schema id) --> [Sink: JsonConverter]
                                                                         |
                                                                         v
                          DataException: Unknown magic byte / failed to deserialize

If a producer (or another source connector) wrote Avro, the sink must use AvroConverter pointed at the same registry. Likewise, mixing schemaless JSON producers with a schema-expecting sink will fail. When a topic has heterogeneous producers, standardize on one serialization format first.

You can override converters per connector via the REST API, which is essential when one worker hosts connectors consuming different formats:

{
  "name": "jdbc-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "orders",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}

Tip: Internal converters (internal.key.converter / internal.value.converter) are deprecated and hardcoded to schemaless JSON for storing offsets and config. Do not change them; they are unrelated to your data converters.

Best Practices

  • Use Avro or Protobuf with Schema Registry for production data — compact payloads plus enforced, evolvable schemas beat self-describing JSON envelopes at scale.
  • Keep key.converter and value.converter independent; StringConverter keys with structured values is a common, sensible combination.
  • Always confirm that the writer’s format matches the reader’s converter before deploying a sink — most “magic byte” and DataException errors are converter mismatches.
  • If you use JsonConverter, decide schemas.enable deliberately: enable it when a sink needs typed records, disable it only for lightweight schemaless flows.
  • Override converters per connector rather than at the worker level when a cluster handles mixed formats.
  • Point every Avro/Protobuf converter at the same Schema Registry URL and configure a sensible compatibility mode to prevent breaking consumers.
Last updated June 1, 2026
Was this helpful?