Cassandra, Pulsar and good ol’ Spring Boot — build reactive applications like a boss!

Author:

In this article, we look at how we can improve your application experience by taking real-time data and using it to power decision-making within your application. The example in this article is sending flight radar data from a streaming platform to the application to notify users of real-time events.

How can an application be reactive when it’s working with data stored in the database? By polling every second, millisecond, microsecond?

The challenge though is to keep data in motion once it’s stored in a database so you can build data pipelines and react to events in real-time. Even better if you can avoid the overhead of installing self-hosted databases and streaming solutions.

Imagine focusing on developing business value that can be verified with unit tests instead of complex integration test suites. Finally, all that without any vendor lock-in based on open source software and widely known programming languages.

In this blog post, we’ll build a solution sending flight radar data to a NoSQL database. Then this data is moved to a streaming platform that powers a business application making decisions based on the received events. In particular, the whole application consists of the following parts:

At the end, the application will notify us about flights captured for a given region if:

  • A plane is flying below 1km.
  • During the last five minutes, more than five unique flights have been captured.
Figure 1: A diagram showing the technologies interacting with each other

Move it! From OpenSky to Astra

The first exercise is to move data from the OpenSky portal to Astra.

Let’s begin with setting up our database. and create a new database called opensky with a vectors keyspace in Google Cloud’s europe-west1 region.

Figure 2: A screenshot of creating a database in DataStax Astra

Copy or download the token details, go to your database, click on the CQL Console tab and create a live table:

CREATE TABLE vectors.live(
icao24 TEXT,
callsign TEXT,
time_position TIMESTAMP,
longitude FLOAT,
latitude FLOAT,
altitude FLOAT,
PRIMARY KEY ((icao24, callsign), time_position)
);

Next, we’ll create a lambda in JavaScript that triggers every 90 seconds. A are available — Cloud Functions from Google, among others.

Our function aims to call the to fetch all flights from within a particular area bound by lamin, lomin, lamax, and lomax. These parameters stand for min latitude, min longitude, max latitude, and max longitude, respectively.

The given parameters restrict the area to a small region, including an airport:

Figure 3: Can you spot the noisy neighbor?

The output of the fetch is:

{
"time": 1658778191,
"states": [
[
"48af05", # icao24
"LOT2ET ", # callsign
"Poland",
1658778186, # timestamp
1658778186,
20.8367, # longitude
52.2022, # latitude
563.88,
false,
77.83,
114.19,
-4.55,
null,
609.6, # altitude
"4626",
false,
0
],
[...]
}

Now we can send a curated form of the output to our Astra database. Instead of connecting directly via a CQL NodeJS client, we can leverage , a set of APIs allowing us to interact with our database through REST, GraphQL, and gRPC:

const config = {
headers: {
'content-type': 'application/json',
'x-cassandra-token': process.env.ASTRA_DB_APPLICATION_TOKEN
} } const payload = {
"icao24": state[0],
"callsign": state[1],
"time_position": state[3] * 1000,
"longitude": state[5],
"latitude": state[6],
"altitude": state[13]
}
axios.post(
`https://${process.env.ASTRA_DB_ID}-${process.env.ASTRA_DB_REGION}
.apps.astra.datastax.com/api/rest/v2/keyspaces
/${process.env.ASTRA_DB_KEYSPACE/${process.env.ASTRA_DB_TABLE}`,
payload,
config
)
.then(resp => {...});

The values for $ASTRA_DB_ID, $ASTRA_DB_REGION and $ASTRA_DB_KEYSPACE can be obtained from the Connect tab under Rest API:

Figure 4: The OpenSky dashboard REST API instructions

For $ASTRA_DB_TABLE we’ll use live and the $ASTRA_DB_APPLICATION_TOKEN value needs to be taken from the token details we were presented while creating a new database.

This is the complete Google Cloud Functions code:

The environment variables are provided when the function is created:

Figure 5: A screenshot using the runtime environment variables

Finally, we need to set up a scheduler that will trigger this function regularly. Google Cloud Scheduler is a good fit here:

Figure 6: Google Cloud Scheduler settings

Once all is set up, we can use the CQL Console to verify our first pipeline works:

Figure 7: ENT78NB seems to be going down, whereas SPTIR is bouncing around

Keep it moving! From Astra DB to Astra Streaming

, isn’t it? A database is a pull-type kind of container. Applications have to request data, and they still do so — typically on clients’ requests. To keep the data moving, you need an additional mechanism that unfreezes the data and loads it into a streaming platform — ideally in real time.

is a solution available to Astra users. Each new record (insert) or modification (update, delete) is transformed into an event and sent to Astra Streaming, a streaming platform backed by Apache Pulsar.

There is no code to be written for this step. Just click on Create Stream in the left pane and choose the europewest1 region in Google Cloud:

Figure 8: The Create a Steam settings in DataStax Astra

Next, in the CDC database tab, just enable it for the vectors.live table:

Figure 9: The CDC tab in the database section
Figure 10: The CDC parameters

That’s it for clicking around… Let’s. Write. Code.

Be event-driven, be reactive! From Astra Streaming to Spring Boot

Having the data in a streaming platform, sky is the limit (no pun intended). You can either use an or the to consume the data, transform it with a , or build a reactive application that — you guessed it — reacts to events.

In this post we’re building a Spring Boot app with the support of a . There is for this adapter, a as well as a .

Reactive Streams… what? Long story short, Reactive Streams is a specification following the rules of the . It is available . One library that implements this spec is , which is available in Spring Boot. For a more in-depth view on the core concepts and usage examples, I can recommend .

Schemas… Everywhere… Everytime…

Last note before we get into coding. There is one topic that is easy to grasp but difficult to work with: serialization. Be it Avro, Protobuf or Thrift. To not leave you with the about working with a schema, I’ll show two approaches — one with self-written classes and one with generated classes from an Avro schema. You have to understand how to work with Avro because Astra CDC is serializing events in that format.

To recap, the flight data moved from OpenSky to Astra DB and besides being available in the database, events are also sent to Astra Streaming. These events are serialized with Avro and the schema can be looked up in Astra Streaming under the Topics tab in the astracdc namespace for the data-<dbid>-vectors.live topic:

Figure 11: OpenSky Topics page

It is a KeyValue type of a schema, having a dedicated schema for the key which corresponds to the partition key in the Astra database, as well as a dedicated schema for the value:

KeyValue schema type; note “kv.encoding.type” property set to “SEPARATED”

As mentioned, there are now two approaches to creating classes for the key and value objects.

Generate from Avro schema

Since the Spring Boot application is built with gradle, let’s introduce a gradle plugin that will read the Avro schema files and generate the required class files:

This is a copy-paste from the original schema, just split into two files and adjusted name and namespace parameters. Both files need to be put in the src/man/avro directory. The required is configured as follows:

plugins {
...
id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}

dependencies {
....
implementation 'org.apache.avro:avro:1.11.0'
}

The compileJava task will run the generateAvroJava task and create the class files in the build/generated-main-avro-java directory.

Hand-crafted POJOs

Knowing the schema, you can also create the classes manually. And with Lombok it’s done with even less boiler-plate code:

Make sure to use the Float/Long classes instead of the float/long primitives.

Business Value!

There has been a lot of clicking around in various clouds for now, and not a single line of code related to the problem we want to solve has been written yet. Let’s develop two reactive services that will:

  • Send a notification (log it in the console), if a plane’s altitude is below 1km around my geographic area and therefore is going to disturb my flow.
  • Send a notification, if during the last five minutes, there have been more than five different planes noticed around my geographic area.

AltitudeService is utilizing the self-written classes, whereas in RushHourService you’ll find the generated ones.

The AltitudeService subscribes to a Pulsar topic and acts on each one. The notification is sent if a particular property (altitude) is below a threshold.

The RushHourService, on the other hand, collects events for a given time window and evaluates the whole set of accumulated flights at once.

The complete application is and can be run with:

./gradlew bootRun

Before doing so, edit the application.properties file and replace <token value> with a real value that can be found in the Settings tab of the opensky Streaming Tenant in the Token Management pane:

Figure 12: Token Management pane

Soon you should see notifications showing up in the application’s console, like:

2022-07-27 16:58:02.530  INFO 4380 --- [nt-internal-4-1] reactive.demo.AltitudeService            : Soon you gonna hear ENT75XU
...
2022-07-27 17:00:28.234 INFO 4380 --- [ parallel-1] reactive.demo.RushHourService : Rush Hour! Collected more than 5 unique flights: [LOT34P, SPEWA, ENT75XU, WZZ3453, SPSIVA, LOT8NE]

Wrapping up

In this blog post, you’ve seen how easy it is to develop event-driven applications. The tedious part — setting up a lambda run-time environment, a database, and a streaming platform — was outsourced, letting us focus on the problem to solve. And with the support of Reactive Streams, we can write modern applications following the non-blocking, asynchronous processing paradigm.

Troubleshooting

As said, working with schemas can be tricky sometimes. Here are a few examples, or rather exceptions:

  • IncompatibleSchemaException
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
{
...
}
using schema:
{
...
}

Make sure you use classes (Long, Float) instead of primitives (long, float) in your POJOs.

  • BufferUnderflowException
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.BufferUnderflowException
Caused by: java.nio.BufferUnderflowException: null
at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:183) ~[na:na]
at java.base/java.nio.ByteBuffer.get(ByteBuffer.java:826) ~[na:na]
at org.apache.pulsar.common.schema.KeyValue.decode(KeyValue.java:137) ~[pulsar-client-api-2.8.2.jar:2.8.2]
at org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl.decode(KeyValueSchemaImpl.java:156) ~[pulsar-client-2.8.2.jar:2.8.2]
at org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl.decode(KeyValueSchemaImpl.java:40) ~[pulsar-client-2.8.2.jar:2.8.2]

When you pass the Schema.KeyValue parameter to the messageReader, make sure to set the encoding type to KeyValueEncodingType.SEPARATED:

reactiveMessageReader = reactivePulsarClient
.messageReader(
Schema.KeyValue(
Schema.AVRO(LiveEvent.Key.class),
Schema.AVRO(LiveEvent.Value.class),
KeyValueEncodingType.SEPARATED)
)
  • ClassNotFoundException: org.conscrypt.Conscrypt
2022-07-27 17:13:11.483  WARN 4809 --- [r-client-io-1-1] o.a.pulsar.common.util.SecurityUtility   : Conscrypt isn't available. Using JDK default security provider.java.lang.ClassNotFoundException: org.conscrypt.Conscrypt
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
at java.base/java.lang.Class.forName(Class.java:375) ~[na:na]
at org.apache.pulsar.common.util.SecurityUtility.loadConscryptProvider(SecurityUtility.java:122) ~[pulsar-client-2.8.2.jar:2.8.2]

Add a dependency in your build.gradle file:

dependencies {
...

runtimeOnly 'org.conscrypt:conscrypt-openjdk-uber:2.5.2'
...
}

Thanks to Chris Bartholomew.

Follow the for more developer stories. Check out our channel for tutorials, and follow DataStax Developers on for the latest news about our developer community.

Resources

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
DataStax

DataStax is the company behind the massively scalable, highly available, cloud-native NoSQL data platform built on Apache Cassandra®.