Avoid Unknown UNION Branch and send avro data using Kafka avro console producer

Abhishek Giri
DataPebbles
Published in
5 min readApr 10, 2024

In this blog, we will talk about schema registry using Avro schema and how we can produce Avro data using console producer and solve the common issue we face while doing this which is “Unknown Union Branch”.

What is the Unknown Union branch?

In Apache Avro, the “unknown union” branch refers to a specific type of schema that can be used in Avro schemas. Avro supports complex data types, including unions, which allow you to define a field that can contain values of multiple different types. In a typical Avro schema, a union is defined by specifying multiple types within square brackets, like [“null”,”Int”].

This defines a union that can contain either a null value or a string value. However, sometimes it’s necessary to handle data where the actual type may not match any of the specified types in the union. In such cases, Avro provides a mechanism to include an “unknown” branch within the union. This allows data that doesn’t match any specified type to still be handled gracefully.

Messages being written to and consumed from a topic are being sent as byte arrays. By applying a schema it ensures that the message written to a topic by a producer can be read and understood by a consumer of the topic.

Due to the decoupled nature of Kafka, producers and consumers do not communicate with each other directly, but rather information transfer via Kafka topic. At the same time, the consumer still needs to know the type of data the producer is sending to deserialize it. If the producer starts sending bad data to Kafka or if the data type of your data gets violated then your downstream consumers will start breaking. We need a way to have a common data type that must be agreed upon.

With the schema registry in place, the producer, before sending the data to Kafka, talks to the schema registry first and checks if the schema is available. If it doesn’t find the schema then it registers and caches it in the schema registry. Once the producer gets the schema, it will serialize the data with the schema and send it to Kafka in binary format prepended with a unique schema ID. When the consumer processes this message, it will communicate with the schema registry using the schema ID it got from the producer and deserialize it using the same schema. If there is a schema mismatch, the schema registry will throw an error letting the producer know that it’s breaking the schema agreement.

We can send data using Avro in two ways either with a Java application or using a console producer.

In this blog, we will talk about Console Avro Producer.

Using the shell of the schema registry a message can be sent via Kafka-avro-console-producer.

Kafka Avro console producer can be started using a schema registry package or schema registry Docker image

kafka-avro-console-producer \ 
--broker-list kafka:29092 \
--topic day-trading-cmd \
--property schema.registry.url=http://localhost:8081 \
--property parse.key=true \
--property key.schema='{"type":"record","name":"DayTradingIdRecord","namespace":"com.data.trading","fields":[{"name":"dateValue","type":{"type":"int","logicalType":"date"}},{"name":"counter","type":"long"}]}' \
--property key.separator="-" \
--property value.schema.id=24

To get the value.schema.id use schema-registry REST endpoint localhost:8081/schemas and use correct subject value:

 {
"subject": "day-trading-cmd-value",
"version": 1,
"id": 24,
"schema": "{\"type\":\"record\",\"name\":\"DayTradingRecord\", ... }"
}

Prepare JSON data and paste it into the producer console:

{
"dateValue": 19655,
"counter": 1701090001876
}-{
"commandType": "INTERNAL_DAY_TRADING_COMPLETED_COMMAND",
"assignmentId": {
"dateValue": 19389,
"counter": 1698916848637
},
"creationTimestamp": 1698916849082,
"sourceSystem": "SERVICE",
"command": {
//In case of UNION provide namespace
"com.data.trade.command.InternalDayTradingCompletedCommandRecord": {
"quantity": {
"longVal": 10,
"shortVal": 0
},
"clearingBusinessDate": 19663,
"instrumentId": {
"symbol": "ASML",
"instrumentType": "STOCK",
"maturityDate": {
//If it is UNION like ["null","int"]
"int": 20581
},
"optionType": {
"com.data.trade.OptionType": "CALL"
},
"strikePrice": {
//If it is UNION like ["null","string"]
"string": "67002.100239"
}
},
"currency": "EUR",
"isin": "EU09825673",
"micsTransaction": {
"transactionReference": 1921,
"quantity": 10,
"position": {
"positionId": {
"accountNumber": {
"clientNumber": 9985,
"accountType": "CLNU",
"accountNumber": 1,
"subAccountNumber": 1
},
"instrumentId": {
"symbol": "ASML",
"instrumentType": "STOCK",
"maturityDate": {
"int": 20581
},
"optionType": {
"com.data.trade.OptionType": "CALL"
},
"strikePrice": {
"string": "67002.100239"
}
}
},
"quantity": {
"longVal": 0,
"shortVal": 0
}
},
"adjustment": {
"adjustmentId": {
"accountNumber": {
"clientNumber": 9985,
"accountType": "CLNU",
"accountNumber": 1,
"subAccountNumber": 1
},
"instrumentId": {
"symbol": "symbol",
"instrumentType": "OPTION",
"maturityDate": {
"int": 20581
},
"optionType": {
"com.data.trade.OptionType": "CALL"
},
"strikePrice": {
"string": "67002.100239"
}
}
},
"adjustment": {
"longVal": -10,
"shortVal": 0
}
},
"positionEffect": {
"com.data.trade.OpenClose": "OPEN"
},
"transactionTypeCode": "SELL",
"currency": "EUR",
"comment": null,
"externalMember": "12qw23",
"externalAccount": "34ewqs",
"uti": {
"string": "E2ed3-4450-d859dc5c060d"
},
"externalNumber": {
"long": 1698916848637
},
"transactionPrice": null,
"exchangeCode": "NSE",
"transactionDate": 1675209600000,
"isin": {
"string": "EU09825673"
},
"side": {
//If it is UNION with enum provide whole namespace
"com.data.trade.Side": "LEFT"
},
"settlementAmount": null,
"settlementType": null,
"dummyKeyProcessed": "0"
},
"reportId": "bsg5532-bsva44312-ndbv55c"
}
}
}

In the JSON sample, I have mentioned inline comments with key findings to avoid Unknown UNION Branch

Then you can paste your JSON into the console producer terminal:

Console Producer

Then you need to start your Kafka avro console consumer in another terminal:

kafka-avro-console-consumer \ 
--topic day-trading-cmd\
--bootstrap-server kafka:29092 \
--property schema.registry.url=http://localhost:8081
--property key.schema='{"type":"record","name":"DayTradingRecord","namespace":"com.data.trade","fields":[{"name":"dateValue","type":{"type":"int","logicalType":"date"}},{"name":"counter","type":"long"}]}' \
--property print.key=true \
--property key.separator="-" \
--from-beginning

Once you paste your JSON into your console producer and press enter, you can see the consumed data in your consumer console:

Also, you can validate your JSON Schema via a Java block of code using schema registry API:

public static void main (String [] args) throws Exception { 
Schema schema = new Schema.Parser().parse("schema-string");
String json = <paste your json schema here>
System.out.println(validateJson(json, schema));
}

public static boolean validateJson(String json, Schema schema) throws Exception {
InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);
try {
DatumReader reader = new SpecificDatumReader(schema);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
reader.read(null, decoder);
return true;
} catch (AvroTypeException e) {
System.out.println(e.getMessage());
e.printStackTrace();
return false;
}
}

This was a quick and short demo of how you can produce and consume Avro data using the console. It might be handy for developers while developing for debugging purposes or to produce missed events by Kafka.

For any queries, please reach out to me @ agiri@datapebbles.com

Till then, Happy Streaming :)

--

--