Skip to content
Apache Kafka kf connect 4 min read

Single Message Transforms (SMT)

Single Message Transforms (SMTs) are lightweight, stateless functions that Kafka Connect applies to each record as it flows through a connector, before it reaches a sink or after it leaves a source. They let you reshape, enrich, mask, or reroute data declaratively in the connector configuration — no custom code, no extra stream-processing job. In production they are the difference between a fragile bespoke pipeline and a clean, config-driven one, and they are cheap because they run in the same thread as the connector with no network hop.

Where SMTs fit in the pipeline

A transform sees one ConnectRecord at a time. For a source connector the chain runs after the connector produces a record but before the converter serializes it to Kafka. For a sink connector the chain runs after the converter deserializes the record but before the connector writes it downstream. Because they are per-record and stateless, SMTs cannot join, aggregate, or window — for anything stateful you want Kafka Streams or ksqlDB instead.

Source: [Source Task] -> [SMT chain] -> [Converter] -> Kafka topic
Sink:   Kafka topic -> [Converter] -> [SMT chain] -> [Sink Task]

Configuring a transform chain

You declare transforms with a transforms list of aliases, then configure each alias by its name. Transforms execute strictly in the order listed, and the output of one becomes the input of the next. Each transform needs a type (its fully qualified class) plus its own options.

transforms=insertSource,maskEmail,route

transforms.insertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertSource.static.field=source_system
transforms.insertSource.static.value=billing-db

transforms.maskEmail.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskEmail.fields=email,ssn
transforms.maskEmail.replacement=***REDACTED***

transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=(.*)
transforms.route.replacement=secure_$1

Note the $Value and $Key suffixes: most built-in transforms ship as a pair of nested classes so you can operate on the record key or the record value independently. Pick the wrong one and the transform silently does nothing.

The built-in transforms you will actually use

TransformPurposeKey options
InsertFieldAdd a static value or record metadata (topic, partition, offset, timestamp) as a fieldstatic.field, static.value, timestamp.field, topic.field
ReplaceFieldDrop (exclude) or whitelist (include) or rename (renames) fieldsexclude, include, renames
MaskFieldReplace sensitive field values with a constant or type-defaultfields, replacement
CastConvert field or whole-value types (e.g. string to int32)spec (e.g. age:int32,price:float64)
RegexRouterRewrite the destination topic name via regex capture groupsregex, replacement
TimestampRouterAppend a formatted timestamp to the topic nametopic.format, timestamp.format

RegexRouter and TimestampRouter change only the topic the record is written to; the payload is untouched. The router-style transforms are how you fan a single connector’s output into date-partitioned or environment-prefixed topics without touching the connector code.

Use case: PII masking plus topic routing

Imagine a Debezium source connector streaming a customers table. Compliance requires that email and ssn never land in a topic in clear text, and that all customer data go to topics prefixed with pii.. We chain a MaskField to redact, then a RegexRouter to relocate the topic.

{
  "name": "customers-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "pg.internal",
    "database.dbname": "shop",
    "table.include.list": "public.customers",
    "topic.prefix": "shop",
    "transforms": "unwrap,maskPii,routePii",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.maskPii.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.maskPii.fields": "email,ssn",
    "transforms.maskPii.replacement": "MASKED",
    "transforms.routePii.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.routePii.regex": "shop.public.(.*)",
    "transforms.routePii.replacement": "pii.$1"
  }
}

A record that Debezium would have written to shop.public.customers now lands on pii.customers with the sensitive fields already redacted. The order matters: masking happens on the value before routing decides the topic name.

Output: a consumer on pii.customers sees:

{"id": 42, "name": "Ada Lovelace", "email": "MASKED", "ssn": "MASKED"}

Conditional transforms with predicates

You rarely want a transform to fire on every record. Predicates gate a transform so it applies only when a condition holds. Combine a predicate with the negate flag to invert it.

transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.route.predicate=isMetrics

predicates=isMetrics
predicates.isMetrics.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.isMetrics.pattern=metrics-.*

Here TimestampRouter only rewrites topics whose name matches metrics-.*. The available predicates are TopicNameMatches, HasHeaderKey, and RecordIsTombstone.

Gotcha: SMTs run synchronously in the task thread, so a heavy custom transform (regex over large strings, JSON re-parsing) becomes your throughput ceiling. Keep them cheap, and never put external I/O inside a transform — that belongs in a Kafka Streams app.

Best practices

  • Order deliberately: put masking and field-pruning before routing so sensitive data never influences a topic name or leaks via metadata fields.
  • Use $Key/$Value correctly — verify which half of the record you are transforming, since the wrong suffix fails silently.
  • Prefer built-in SMTs and predicates over custom ones; reserve custom Transformation<R> implementations for genuinely missing logic, and keep them allocation-free.
  • Gate transforms with predicates instead of running them on every record when only a subset needs them.
  • Keep SMTs stateless and side-effect-free — anything requiring joins, aggregation, or lookups belongs in Kafka Streams or ksqlDB, not the Connect pipeline.
  • Test transform chains against representative records in a staging connector before promoting config, because misordered or mistyped transforms surface only at runtime.
Last updated June 1, 2026
Was this helpful?