How to transform and extract fields in Kafka sink JDBC connector

I think you want ExtractField, and unfortunately, it's a Map.get operation, so that means 1) nested fields cannot be gotten in one pass 2) multiple fields need multiple transforms.

That being said, you might to attempt this (untested)

transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers

If that doesn't work, you might be better off implementing your own Transformations package that can at least drop values from the Struct / Map.


If you're willing to list specific field names, you can solve this by:

  1. Using a Flatten transform to collapse the nesting (which will convert the original structure's paths into dot-delimited names)
  2. Using a Replace transform with rename to make the field names be what you want the sink to emit
  3. Using another Replace transform with whitelist to limit the emitted fields to those you select

For your case it might look like:

  "transforms": "t1,t2,t3",
  "transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
  "transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",