Serdes

By default, Kpow ships with the following serdes available from data inspect:

  • JSON
  • JSON Schema
  • AVRO
  • AVRO (Strict)
  • Protobuf
  • String
  • EDN
  • Double
  • Float
  • Integer
  • Long
  • Short
  • UUID
  • Transit / JSON
  • Transit / JSON-Verbose
  • Transit / Msgpack

AVRO, JSON Schema, and Protobuf

Schema Registry serdes in Kpow follow the wire format documented here.

See: Custom Serdes to use Kpow with Protobuf messages without a Schema Registry.

Kpow integrates with Confluent Schema Registry and allows for AVRO, JSON Schema and Protobuf serdes to be used in data inspect.

See Schema Registry for more on how to configure Confluent Schema Registry with Kpow.

Once configured, from within the data inspect UI you will now be able to select the schema and subject strategy when searching for records by key:

Configuring Serdes

Kpow offers some configuration on how serdes are presented in the UI.

Default Serdes

Set DEFAULT_KEY_SERDES, DEFAULT_VALUE_SERDES or DEFAULT_HEADERS_SERDES to specify which serdes should be selected from the dropdown by default when using data inspect.

Available Serdes

To restrict the serdes available to your users set AVAILABLE_KEY_SERDES or AVAILABLE_VALUE_SERDES

Eg: AVAILABLE_VALUE_SERDES=JSON,AVRO to only ever show JSON or AVRO serdes from within Kpow's UI

When filtering serdes use the same label name as the one in the serdes dropdown. Eg: "AVRO (Strict)"

Custom Serdes

Kpow works with custom org.apache.kafka.common.serialization.Serde<String> implementations.

Why <String>?

Kpow expects your custom serdes to provide String key/value output when deserializing messages and will interpret that String key/value output according to the serdes format you configure.

Kpow will provide your custom serdes with String input when your users select a custom serdes to produce data to a topic, it is up to you how you interpret and encode that data for serialization.

Serdes with json or clojure format will have Data policies applied and can be searched with kJQ filters.

Requirements

Kafka 2.1.0 introduced default functions on the Deserializer and Serializer interface that accept message headers as an argument.

Kpow calls those default functions, so your serdes must be compiled with Kafka 2.1.0+.

Your custom serdes should provide a Deserializer<String> and a Serializer<String>.

Example Custom Serde<String>

Here is an example custom serde that simply passes through to string serialization.

package example.serdes;

import org.apache.kafka.common.serialization.*;

import java.util.Map;

public class CustomSerde implements Serde<String> {

    private Map configs;

    @Override
    public void configure(Map configs, boolean isKey) {
        this.configs = configs;
    }

    @Override
    public Serializer<String> serializer() {
        return new StringSerializer();
    }

    @Override
    public Deserializer<String> deserializer() {
        return new StringDeserializer();
    }

    public void close() {
    }

    public Map configs() {
        return configs;
    }
}

Implementation

When deserializing, Kpow calls the following default function on the org.apache.kafka.common.serialization.Deserializer returned by your custom serde.

default T deserialize(String topic, Headers headers, byte[] data) {

When serializing, Kpow calls the following default function on the org.apache.kafka.common.serialization.Serializer returned by your custom serde.

default byte[] serialize(String topic, Headers headers, T data) {

In the case of deserialization, the headers passed will be exactly as they exist off-the-wire.

In the case of serialization, the headers passed will have values of UTF-8 encoded byte arrays.

This divergence is down to the fact that Kpow is a text-based web UI. In the case of data production a user inputs headers in text format, and that's how we pass them to you.

Setup

Provide your custom serdes on the Kpow classpath (see How to add a Jar to the Classpath).

Use a plain Java command similar to below to start kpow with a set classpath, note factorhouse.kpow.

java -Xmx8G -cp /opt/kpow/lib/kpow.jar:/opt/org/custom-serdes.jar factorhouse.kpow

Configure Kpow with the CUSTOM_SERDES environment variable:

# CUSTOM_SERDES accepts a comma-separated list of serdes classes

CUSTOM_SERDES=org.corp.XMLSerde,org.corp.MyCustomSerde

Configuration

Include a YAML configuration file in your jar file to configure custom serdes further (Optional).

E.g. org.corp.XMLSerde can be configured with org/corp/XMLSerde.yml

The following fields are available to configure your serdes:

  • name - the display name to use in Kpow's Data inspect UI.
  • format - json, clojure or string (default)
  • isKey - true for key data only, false for value data only, leave unset for both key/value data.
  • config - a map of config values passed into the serdes configure method

Config is converted to String/String and passed to your Serdes/configure method.
Config values starting with $ are resolved as environment variables. (e.g. $BOOTSTRAP below)

E.g. Single Serdes Configuration (json format, available to key and value fields).

name: PROTO
format: json
config:
  bootstrap: $BOOTSTRAP
  limit: 22
  display: another-value
  abc: $SOME_ENV

E.g. Multi Serdes Configuration (json format, separate configured serdes for key and value fields)

serdes:
  - name: "PROTO 1"
    format: "json"
    isKey: true
    config:
      bootstrap: "some-value"
      limit: 22
      display: another-value
      abc: $SOME_ENV
  - name: "PROTO 2"
    format: "json"
    isKey: false
    config:
      bootstrap: "some-value"
      limit: "100"
      display: another-value
      abc: $ANOTHER_ENV

Serdes default to String format and are available to keys and values where no config provided

On startup you will see details logged of each custom serdes:

INFO operatr.kafka.serdes – initializing custom serdes: kpow.serdes.CustomSerdesExample
INFO operatr.kafka.serdes – found kpow/serdes/CustomSerdesExample.yml
INFO operatr.kafka.serdes – serde configuration: Custom Serde!
WARN operatr.kafka.serdes – environment variable $SOME_ENV not set
INFO operatr.kafka.serdes – config 'Custom Serde!', isKey?: null, format: json, config: ("bootstrap" "limit" "display" "abc").