Building Real-Time Schema Pipelines from Messaging Topics

Tim Spann
3 min readJan 4, 2023

--

Code: https://github.com/tspannhw/pulsar-admintool

Setup your connection

spring:
pulsar:
administration:
service-url: pulsar://localhost:6650
client:
service-url: pulsar://localhost:6650
producer:
batching-enabled: false
send-timeout-ms: 90000
producer-name: admintool
topic-name: persistent://public/default/schema-alerts
logging:
level:
org.apache.pulsar: error
root: error
ROOT: error
com.pulsardeveloper: error
org:
apache:
pulsar:
shade: error
server.port: 8599

Connect to the the Pulsar Admin API

PulsarAdmin admin = PulsarAdmin.builder() 
.serviceHttpUrl(url)
.allowTlsInsecureConnection(true)
.build();

Get all the topics for your tenant/namespace

List<String> list = admin.topics().getList(namespace);

Build a list of schemas

List<SchemaDisplay> schemaList = new ArrayList<>(list.size());
SchemaDisplay schemaDisplay = null;
for (String topic : list) {
if ( topic != null ) {
try {
SchemaInfo schemaInfo = admin.schemas().getSchemaInfo(topic);

if ( schemaInfo != null && schemaInfo.getName() !=null &&
schemaInfo.getType() != null && schemaInfo.getSchemaDefinition() != null) {

schemaDisplay = new SchemaDisplay();
schemaDisplay.setSchemaInfo( schemaInfo);
schemaDisplay.setTopic( topic );
schemaDisplay.setSchemaVersion( admin.schemas().getVersionBySchema(topic, schemaInfo) );
schemaList.add(schemaDisplay);
}

} catch (Throwable t) {
// System.out.println("No schema for topic");
}
}

Build Your Maven Java App Skeleton

mvn -B archetype:generate   
-DarchetypeGroupId=org.apache.maven.archetypes
-DgroupId=com.pulsardeveloper
-DartifactId=admintool

Build Your Java App

MAVEN_OPTS=-Dorg.slf4j.simpleLogger.defaultLogLevel=error mvn clean package -Dorg.slf4j.simpleLogger.defaultLogLevel=error

Run Your Java App

MAVEN_OPTS=-Dorg.slf4j.simpleLogger.defaultLogLevel=error mvn compile exec:java -Dexec.mainClass="com.pulsardeveloper.App" -e -Dorg.slf4j.simpleLogger.defaultLogLevel=error -q

Send Your Schema Alerts

if ( schemaList.size() > 0) {
PulsarClient client = PulsarClient.builder()
.serviceUrl(url)
.build();

Producer<SchemaAlert> producer = client.newProducer(Schema.JSON(SchemaAlert.class))
.topic("persistent://public/default/schema-alert")
.create();

// TODO: cache current schema definition and version and check if change
for (SchemaDisplay schema : schemaList) {
if ( schema != null ) {
UUID uuidKey = UUID.randomUUID();
SchemaAlert schemaAlert = new SchemaAlert();
schemaAlert.setTopic ( schema.getTopic());
schemaAlert.setSchemaType ( schema.getSchemaInfo().getType().toString() );
schemaAlert.setSchemaName ( schema.getSchemaInfo().getName() );
schemaAlert.setSchemaDefinition( schema.getSchemaInfo().getSchemaDefinition() );
schemaAlert.setMessageId( admin.topics().getLastMessageId(schema.getTopic()).toString() );
schemaAlert.setUUID(uuidKey.toString() );
schemaAlert.setSchemaVersion( schema.getSchemaVersion() );

Timestamp timestamp = new Timestamp(System.currentTimeMillis());
schemaAlert.setTs( timestamp.getTime() );

Locale loc = new Locale("en", "US");
DateFormat dateFormat = DateFormat.getDateInstance(DateFormat.DEFAULT, loc);

schemaAlert.setAlertDateTime( dateFormat.format(new Date()) );
MessageId schemaAlertMessageID = producer.newMessage().
key(uuidKey.toString())
.value(schemaAlert)
.send();

System.out.println("Sent Schema Alert " + schemaAlertMessageID.toString());
// TODO: Can make rest call to send to a registry
// TODO: Send to a database
}
}
}

Schema Alert Class Snippet

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonPropertyOrder({
"topic",
"schemaType",
"schemaName",
"schemaDefinition",
"messageId",
"UUID",
"ts",
"alertDateTime",
"schemaVersion"
})
public class SchemaAlert implements Serializable {
private String topic = "";
private String schemaType = "";
private String schemaName = "";
private String schemaDefinition = "";
private String messageId = "";
private String UUID = "";
private long ts = 0;
private String alertDateTime = "";

private Long schemaVersion = null;
// ....
}

Example Output

----- got message -----
key:[36a613b0-8ba6-444c-8432-ea0e8adfe187], properties:[],
content:{"topic":"persistent://public/default/aircraft-partition-0",
"schemaType":"JSON",
"schemaName":"aircraft-partition-0",
"schemaDefinition":"{\"type\":\"record\",\"name\":\"Aircraft\",\"namespace\":\"dev.pulsarfunction.adsb\",\"fields\":[{\"name\":\"alt_baro\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"alt_geom\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"baro_rate\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"category\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"emergency\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flight\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gs\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"gva\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"hex\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"lat\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"lon\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"mach\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"messages\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"nac_p\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"nac_v\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"nav_altitude_mcp\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"nav_heading\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"nav_qnh\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"nic\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"nic_baro\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"rc\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"rssi\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"sda\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"seen\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"seen_post\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"sil\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"sil_type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"speed\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"squawk\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"track\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"version\",\"type\":[\"null\",\"int\"],\"default\":null}]}","messageId":"24612:377:0:0","ts":1670953809183,"alertDateTime":"Dec 13, 2022","schemaVersion":4,"uuid":"36a613b0-8ba6-444c-8432-ea0e8adfe187"}

----- got message -----
key:[f55b9705-9739-442e-897c-4891348301bd], properties:[], content:{"topic":"persistent://public/default/mtaalert-partition-0","schemaType":"JSON","schemaName":"mtaalert-partition-0","schemaDefinition":"{\"type\":\"record\",\"name\":\"Alert\",\"namespace\":\"dev.datainmotion.gtfsrealtime.model\",\"fields\":[{\"name\":\"activePeriodEnd\",\"type\":\"long\"},{\"name\":\"activePeriodStart\",\"type\":\"long\"},{\"name\":\"agency\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"cause\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"descriptionLanguage\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"descriptionText\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"directionId\",\"type\":\"long\"},{\"name\":\"effect\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"feedEntityID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"headerLanguage\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"headerText\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"routeId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"severityLevel\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"stopId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tripDirectionId\",\"type\":\"long\"},{\"name\":\"tripRouteId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tripStartDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tripStartTime\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"url\",\"type\":[\"null\",\"string\"],\"default\":null}]}","messageId":"15015:129:0","ts":1670953809188,"alertDateTime":"Dec 13, 2022","schemaVersion":0,"uuid":"f55b9705-9739-442e-897c-4891348301bd"}

The next step is to ingest the alerts and use them to populate a database, a Spark input or a Flink SQL join.

You could also use this data to send your schema to a different Pulsar cluster, a 3rd party schema registry or to build a SQL table.

Let me know what you would want for next steps.

--

--

Tim Spann

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/