Redpanda AVRO WASM — How To

Mark Olliver
9 min readJun 17, 2024

--

In this blog post, I’ll walk you through how to transform (specifically, mask) an AVRO data topic with a known schema using the Redpanda WASM engine and Golang. This is part one of a two-part series. The complete code is available on my GitHub. The repository contains code for both parts, so don’t be overwhelmed by the amount of content.

For an introduction to Redpanda and its WASM service, see my previous post here.

Scenario

We have a Kafka topic containing AVRO data on customers. We need to share this data with a downstream real-time consumer, but for data protection reasons, the downstream user shouldn’t see customers’ first or last names.

Possible Solutions

  1. Request changes from the source producer: They could mask the data or create a second topic with masked fields.
  2. Use Flink: Consume all messages, mask the sensitive information, and produce a new topic.
  3. Downstream redaction: Let the downstream consumer handle the masking.

Each of these options has drawbacks:

  • Trust and legal issues with letting downstream users manage data.
  • Flink introduces complexity and cost.
  • Lack of control over the source producer and potential data quality issues.

Redpanda WASM Solution

Redpanda’s WASM engine is ideal here. It’s fast and guarantees message delivery without loss. Let’s dive into the implementation.

Sample Data and Schema

Here’s a sample of the raw data in JSON format and its AVRO schema representation:

RAW Customer Data

Here is the AVRO Schema representation.

{
"type": "record",
"name": "CustomerEvent",
"namespace": "com.demo.event.v1",
"fields": [
{
"name": "metadata",
"type": {
"type": "record",
"name": "EventMetadata",
"fields": [
{"name": "message_key", "type": "string"},
{"name": "created_date", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "updated_date", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "outbox_published_date", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "event_type", "type": "string"}
]
}
},
{
"name": "payload",
"type": {
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "string"},
{"name": "title", "type": ["null", "string"], "default": null},
{"name": "preferred_name", "type": ["null", "string"], "default": null},
{"name": "given_name", "type": ["null", "string"], "default": null},
{"name": "last_name", "type": ["null", "string"], "default": null},
{"name": "middle_name", "type": ["null", "string"], "default": null},
{"name": "date_of_birth", "type": ["null", {"type": "int", "logicalType": "date"}], "default": null},
{"name": "date_of_death", "type": ["null", {"type": "int", "logicalType": "date"}], "default": null},
{"name": "gender", "type": ["null", "string"], "default": null},
{"name": "place_of_birth", "type": ["null", "string"], "default": null},
{"name": "country_of_residence", "type": ["null", "string"], "default": null}
]
}
}
]
}

We will not mask records where the customer’s first and last names are “Jane Smith”.

Environment Setup

To simplify the demonstration, a devcontainer is included in the GitHub repo, designed to work with VSCode and Docker. Here’s how to set it up:

  1. Clone the repository:
git clone https://github.com/pixie79/redpanda-demo.git

2. Open the repo in VSCode as a devcontainer. (for more information look here)

3. Once you have the devcontainer running we need to complete the setup, open a zsh terminal within the environment then run the following command

setup-devcontainer.sh

This installs the missing components of the container build along with the latest version of rpk the Redpanda cli tool.

As we are creating a Redpanda transform the first step is to get Redpanda running. Here I have created a docker-compose which will meet your needs. (It also contains other bits to allow you to use it as a base for future learning on Redpanda, including metrics monitoring and alerting with Promethous and Grafana)

To keep the commands simple, I have wrapped them in ‘task calls’. If you would like to see the underlying call please look in the Taskfile.yml

4. Start Redpanda using docker-compose

task demo-start

Now if all went well you should be able to see a Redpanda console in your local browser http://localhost:8080

The Topics and Schema Registry tabs should show the source and destination topics/schemas.

Topics window in the Redpanda console

Slightly out of scope of this demo is the creation of the demo data, but this is also created using go and some random functions. As a bit of background we create 1000 random JSON customer records, later on we will encode these to AVRO and produce them to the demo topic.

5. For now run the following commands to build the generation tool and create the data.

task build-test-data-generator
task generate-test-data

If you are still following you will be please to know that is the setup completed :)

The Transform

The transformation code is written in TinyGo. Due to WASM restrictions, it operates on individual messages passed to it in the buffer, but it can reference any data (subject to memory and processing limits) either compiled in or passed as a runtime environment variable. Here’s a breakdown of the key functions:

The Init Function

func init() {
var (
err error
unmaskedCustomers string
)

pUtils.SetupLogger()

unmaskedCustomers = os.Getenv("UNMASKED_CUSTOMERS")
if unmaskedCustomers == "" {
slog.Error("UNMASKED_CUSTOMERS environment variable is required")
panic("UNMASKED_CUSTOMERS environment variable is required")
}
slog.Debug("UNMASKED_CUSTOMERS", "unmaskedCustomers", unmaskedCustomers)

destinationCodec, hdr, err = pTransforms.FetchAvroDestinationSchema()
if err != nil {
slog.Error("Error fetching destination schema", "Error", err)
panic(fmt.Sprintf("Error fetching destination schema: %v\n", err))
}

_, unmaskedCustomerMap, err = pUtils.UnmarshalCustomers(unmaskedCustomers)
if err != nil {
slog.Error("Error unmarshalling customers", "Error", err)
}
slog.Debug("Not Masking Customers with the last_name", "unmaskedCustomerMap", unmaskedCustomerMap)

}

In the init function first I setup logging using log/slog wrapped into my helper function pUtils.SetupLogger() to keep it consistent which each of my programs.

We then grab an environment variable called UNMASKED_CUSTOMERS, this is a JSON escaped string, that takes a list of customer first_name and last_name pairs who we wish to not mask for testing purposes. We can edit this list at deploy time by supplying the variable. (Be careful of the shell you use to ensure the escaped quoting is correct. The version setup in the Taskfile.yml works fine within the devcontainer running zsh on OSX).

To reduce workload at this point we grab the destination avro codec from the Schema Registry as we only need to complete this task once as the WASM is loaded into memory. This makes use of the Redpanda go transforms sechema registry SDK and looks for an Environment variable called DESTINATION_SCHEMA_ID. This again is passed as a deploy time environment variable, it would not generally change during the run time as it is presumed you would need to update the WASM if the payload changes.

Alternatively we could could compile the schema in to the WASM, but that would increase management overheard and we would still need to pass in the DESTINATION_SCHEMA_ID in order for the correct byte header to be encoded within the AVRO message. (Either that or bloat each message with a full copy of the AVRO schema in addition to the payload).

To ensure the masking can run as fast as possible we convert the JSON list of UNMASKED_CUSTOMERS into a go indexed MAP which will enable much faster checking of data.

The Main

func main() {
slog.Info("Running transformer")
transform.OnRecordWritten(toAvro)
}
  • Called for each message received on the watched topic.
  • Passes the message to the toAvro function.

To Worker Function — toAvro

func toAvro(e transform.WriteEvent, w transform.RecordWriter) error {
var (
LastName string
GivenName string
err error
)
// Decode the raw event
nestedMap, err := pTransforms.DecodeAvroRawEvent(e)
if err != nil {
slog.Error("Error decoding Avro", "Error", err)
return err
}

Payload := nestedMap["payload"].(map[string]interface{})

ln, ok := Payload["last_name"].(map[string]interface{})
if ok {
LastName = ln["string"].(string)
if ok {
// Check if the last name is in the list of customers to not mask
if !pUtils.StringInMap(LastName, unmaskedCustomerMap) {
if gn, ok := Payload["given_name"].(map[string]interface{}); ok {
GivenName = gn["string"].(string)
}

Payload["given_name"] = pUtils.WrapUnionSimple(pUtils.MaskString(GivenName, "*", "fixed", 6), "string")
Payload["last_name"] = pUtils.WrapUnionSimple(pUtils.MaskString(LastName, "*", "fixed", 6), "string")
slog.Debug("Customer found - masking.")
} else {
slog.Info("Unmasked Customer found - not masking.")
}
} else {
slog.Debug("Last name not available.")
}
} else {
slog.Debug("Last name field not found.")
}

record, err := pTransforms.EncodeAvroRecord(nestedMap, destinationCodec, hdr, e.Record().Key, e.Record().Headers)
if err != nil {
slog.Error("Error encoding Avro", "Error", err)
return err
}
slog.Debug("Returning AVRO", "record", record)
return w.Write(record)
}

This function takes in the source record still in AVRO format and returns the new transformed record in AVRO format.

As the transform is deployed to watch a specific AVRO topic it know the name of the topic and so we do not need to set an environment variable for that, what we dont have however, is the schema as this can change overtime for the source messages. Instead this is encoded in the first few bytes of the AVRO message. You will see here the first call is to pTransforms.DecodeAvroRawEvent(e) This takes the raw message, strips of the byte header which is then uses to grab the schema from the schema registry. As Redpanda uses a single binary for its brokers each broker also includes the schema registry so no external calls are required for this.

The output of the DecodeAvroRawEvent is a nested map[string]interface{}, ready for go to use and process further as needed.

The first couple of checks we make are to ensure that last_name is set in the payload, as from the schema we can tell it could be null. If it is null then we dont worry about masking it. If the field is not null we then need to get the string value.

ln, ok := Payload["last_name"].(map[string]interface{})
if ok {
LastName = ln["string"].(string)

The next call uses our customer map to check if the customer should be excluded from masking.

// Check if the last name is in the list of customers to not mask
if !pUtils.StringInMap(LastName, unmaskedCustomerMap) {

If we need to mask then we call the masking function for both LastName and FirstName updating the payload as we go.

Payload["given_name"] = pUtils.WrapUnionSimple(pUtils.MaskString(GivenName, "*", "fixed", 6), "string")
Payload["last_name"] = pUtils.WrapUnionSimple(pUtils.MaskString(LastName, "*", "fixed", 6), "string")

I will discuss the function pUtils.MaskString more in the go-testcontainers post later.

The last step is to encode the payload back into a valid AVRO message using the destination codec.

 record, err := pTransforms.EncodeAvroRecord(nestedMap, destinationCodec, hdr, e.Record().Key, e.Record().Headers)
if err != nil {
slog.Error("Error encoding Avro", "Error", err)
return err
}

If there is an error in the process we can return that and it will be handled by the WASM or we could write the message out to a Dead Letter Topic.

Once encoded we then return the message back to the calling process which will handle the write to the destination topic.

Running the demo

Open two terminal windows:

  1. In the first window, monitor errors(These are written to a kafka error logs topic):
rpk transform logs demo -f

2. In the second window, build the data loader. This program will read in the JSON file and convert it to AVRO before writing it to the topic. (In a future post I will show how to do this with Redpanda Connect)

task build-test-data-loader

3. Build and deploy the WASM transform:

task deploy-demo

Commands wrapped by task deploy-demo:

cd go/transform/demo
rpk transform build
export DESTINATION_SCHEMA_ID=$(rpk registry schema get output-demo-value --schema-version latest --format json | jq '.[0].id')
export UNMASKED_CUSTOMERS="[{\"last_name\": \"Smith\",\"first_name\": \"Jane\"}]"
rpk transform deploy --file demo.wasm \\
--name demo \\
--input-topic demo \\
--output-topic output-demo \\
--var DESTINATION_SCHEMA_ID=$DESTINATION_SCHEMA_ID --var UNMASKED_CUSTOMERS=$UNMASKED_CUSTOMER \\
--var LOG_LEVEL=DEBUG

4. Finally, run the data loader to produce the test data:

task load-td-demoEvent

Check the source and destination topics in the Redpanda console output-demo topic. The destination topic should have masked first and last name fields, except for “Jane Smith”.

Example output message in Redpanda Console

Those who have been following along carefully will notice the demo does not quite match the original requirements. Currently the code only matches on the LastName not a combination of both the Last and First Names, this is deliberate as it will allow you to build on an play with the example to get it to meet the requirements.

To cleanup and destroy the environment run

task clean

Feel free to follow along with my posts and suggest other topics you’d like me to cover.

--

--

Mark Olliver

Focussed on streaming data, from the view of someone who has been a tester, architect and engineer, balance strategic foresight with technical proficiency.