Cassandra, Pulsar and good ol’ Spring Boot — build reactive applications like a boss!
Author: Jaroslaw Kijanowski
In this article, we look at how we can improve your application experience by taking real-time data and using it to power decision-making within your application. The example in this article is sending flight radar data from a streaming platform to the application to notify users of real-time events.
How can an application be reactive when it’s working with data stored in the database? By polling every second, millisecond, microsecond?
The challenge though is to keep data in motion once it’s stored in a database so you can build data pipelines and react to events in real-time. Even better if you can avoid the overhead of installing self-hosted databases and streaming solutions.
Imagine focusing on developing business value that can be verified with unit tests instead of complex integration test suites. Finally, all that without any vendor lock-in based on open source software and widely known programming languages.
In this blog post, we’ll build a solution sending flight radar data to a NoSQL database. Then this data is moved to a streaming platform that powers a business application making decisions based on the received events. In particular, the whole application consists of the following parts:
- A Google Cloud Function calling the OpenSky API every 90 seconds (to not hit the limit of 1000 API calls per day) and sending the data to
- Astra, an Apache Cassandra in-da-cloud solution,
- Astra CDC — a change-data capture mechanism streaming data in real-time from Astra Database to
- Astra Streaming, an Apache Pulsar in-da-cloud solution exposing flight data to
- A Spring Boot Application built with Reactive Pulsar, a Reactive Streams adapter for Apache Pulsar Java Client.
At the end, the application will notify us about flights captured for a given region if:
- A plane is flying below 1km.
- During the last five minutes, more than five unique flights have been captured.
Move it! From OpenSky to Astra
The first exercise is to move data from the OpenSky portal to Astra.
Let’s begin with setting up our database. Log in to Astra and create a new database called opensky
with a vectors keyspace
in Google Cloud’s europe-west1
region.
Copy or download the token details, go to your database, click on the CQL Console tab and create a live
table:
CREATE TABLE vectors.live(
icao24 TEXT,
callsign TEXT,
time_position TIMESTAMP,
longitude FLOAT,
latitude FLOAT,
altitude FLOAT,
PRIMARY KEY ((icao24, callsign), time_position)
);
Next, we’ll create a lambda in JavaScript that triggers every 90 seconds. A few popular FaaS providers are available — Cloud Functions from Google, among others.
Our function aims to call the OpenSky REST API to fetch all flights from within a particular area bound by lamin
, lomin
, lamax
, and lomax
. These parameters stand for min latitude, min longitude, max latitude, and max longitude, respectively.
https://opensky-network.org/api/states/all?lamin=52.0&lomin=20.6&lamax=52.4&lomax=21.3
The given parameters restrict the area to a small region, including an airport:
The output of the fetch is:
{
"time": 1658778191,
"states": [
[
"48af05", # icao24
"LOT2ET ", # callsign
"Poland",
1658778186, # timestamp
1658778186,
20.8367, # longitude
52.2022, # latitude
563.88,
false,
77.83,
114.19,
-4.55,
null,
609.6, # altitude
"4626",
false,
0
],
[...]
}
Now we can send a curated form of the output to our Astra database. Instead of connecting directly via a CQL NodeJS client, we can leverage Stargate, a set of APIs allowing us to interact with our database through REST, GraphQL, and gRPC:
const config = {
headers: {
'content-type': 'application/json',
'x-cassandra-token': process.env.ASTRA_DB_APPLICATION_TOKEN
} } const payload = {
"icao24": state[0],
"callsign": state[1],
"time_position": state[3] * 1000,
"longitude": state[5],
"latitude": state[6],
"altitude": state[13]
}
axios.post(
`https://${process.env.ASTRA_DB_ID}-${process.env.ASTRA_DB_REGION}
.apps.astra.datastax.com/api/rest/v2/keyspaces
/${process.env.ASTRA_DB_KEYSPACE/${process.env.ASTRA_DB_TABLE}`,
payload,
config
)
.then(resp => {...});
The values for $ASTRA_DB_ID
, $ASTRA_DB_REGION
and $ASTRA_DB_KEYSPACE
can be obtained from the Connect
tab under Rest API
:
For $ASTRA_DB_TABLE
we’ll use live
and the $ASTRA_DB_APPLICATION_TOKEN
value needs to be taken from the token details we were presented while creating a new database.
This is the complete Google Cloud Functions code:
The environment variables are provided when the function is created:
Finally, we need to set up a scheduler that will trigger this function regularly. Google Cloud Scheduler is a good fit here:
Once all is set up, we can use the CQL Console
to verify our first pipeline works:
Keep it moving! From Astra DB to Astra Streaming
It’s go time for real-time data, isn’t it? A database is a pull-type kind of container. Applications have to request data, and they still do so — typically on clients’ requests. To keep the data moving, you need an additional mechanism that unfreezes the data and loads it into a streaming platform — ideally in real time.
Change Data Capture (CDC) is a solution available to Astra users. Each new record (insert) or modification (update, delete) is transformed into an event and sent to Astra Streaming, a streaming platform backed by Apache Pulsar.
There is no code to be written for this step. Just click on Create Stream
in the left pane and choose the europewest1
region in Google Cloud
:
Next, in the CDC
database tab, just enable it for the vectors.live
table:
That’s it for clicking around… Let’s. Write. Code.
Be event-driven, be reactive! From Astra Streaming to Spring Boot
Having the data in a streaming platform, sky is the limit (no pun intended). You can either use an Apache Pulsar client or the Apache Pulsar CLI to consume the data, transform it with a Pulsar Function in Astra, or build a reactive
application that — you guessed it — reacts to events.
In this post we’re building a Spring Boot app with the support of a Reactive Streams adapter for the Apache Pulsar Java client. There is a tutorial for this adapter, a 5MAP (5 minutes about Pulsar) introduction as well as a video from the SpringOne 2021 conference.
Reactive Streams… what? Long story short, Reactive Streams is a specification following the rules of the Reactive Manifesto. It is available for the JVM. One library that implements this spec is Project Reactor, which is available in Spring Boot. For a more in-depth view on the core concepts and usage examples, I can recommend this DZone article.
Schemas… Everywhere… Everytime…
Last note before we get into coding. There is one topic that is easy to grasp but difficult to work with: serialization. Be it Avro, Protobuf or Thrift. To not leave you with the Pulsar documentation about working with a schema, I’ll show two approaches — one with self-written classes and one with generated classes from an Avro schema. You have to understand how to work with Avro because Astra CDC is serializing events in that format.
To recap, the flight data moved from OpenSky to Astra DB and besides being available in the database, events are also sent to Astra Streaming. These events are serialized with Avro and the schema can be looked up in Astra Streaming under the Topics
tab in the astracdc
namespace for the data-<dbid>-vectors.live
topic:
It is a KeyValue
type of a schema, having a dedicated schema for the key which corresponds to the partition key in the Astra database, as well as a dedicated schema for the value:
As mentioned, there are now two approaches to creating classes for the key and value objects.
Generate from Avro schema
Since the Spring Boot application is built with gradle, let’s introduce a gradle plugin that will read the Avro schema files and generate the required class files:
This is a copy-paste from the original schema, just split into two files and adjusted name
and namespace
parameters. Both files need to be put in the src/man/avro
directory. The required gradle plugin is configured as follows:
plugins {
...
id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}
dependencies {
....
implementation 'org.apache.avro:avro:1.11.0'
}
The compileJava
task will run the generateAvroJava
task and create the class files in the build/generated-main-avro-java
directory.
Hand-crafted POJOs
Knowing the schema, you can also create the classes manually. And with Lombok it’s done with even less boiler-plate code:
Make sure to use the Float/Long classes instead of the float/long primitives.
Business Value!
There has been a lot of clicking around in various clouds for now, and not a single line of code related to the problem we want to solve has been written yet. Let’s develop two reactive services that will:
- Send a notification (log it in the console), if a plane’s altitude is below 1km around my geographic area and therefore is going to disturb my flow.
- Send a notification, if during the last five minutes, there have been more than five different planes noticed around my geographic area.
AltitudeService
is utilizing the self-written classes, whereas in RushHourService
you’ll find the generated ones.
The AltitudeService
subscribes to a Pulsar topic and acts on each one. The notification is sent if a particular property (altitude) is below a threshold.
The RushHourService
, on the other hand, collects events for a given time window and evaluates the whole set of accumulated flights at once.
The complete application is available in Github and can be run with:
./gradlew bootRun
Before doing so, edit the application.properties
file and replace <token value>
with a real value that can be found in the Settings
tab of the opensky
Streaming Tenant in the Token Management
pane:
Soon you should see notifications showing up in the application’s console, like:
2022-07-27 16:58:02.530 INFO 4380 --- [nt-internal-4-1] reactive.demo.AltitudeService : Soon you gonna hear ENT75XU
...
2022-07-27 17:00:28.234 INFO 4380 --- [ parallel-1] reactive.demo.RushHourService : Rush Hour! Collected more than 5 unique flights: [LOT34P, SPEWA, ENT75XU, WZZ3453, SPSIVA, LOT8NE]
Wrapping up
In this blog post, you’ve seen how easy it is to develop event-driven applications. The tedious part — setting up a lambda run-time environment, a database, and a streaming platform — was outsourced, letting us focus on the problem to solve. And with the support of Reactive Streams, we can write modern applications following the non-blocking, asynchronous processing paradigm.
Troubleshooting
As said, working with schemas can be tricky sometimes. Here are a few examples, or rather exceptions:
- IncompatibleSchemaException
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema:
{
...
}
using schema:
{
...
}
Make sure you use classes (Long, Float) instead of primitives (long, float) in your POJOs.
- BufferUnderflowException
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.BufferUnderflowException
Caused by: java.nio.BufferUnderflowException: null
at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:183) ~[na:na]
at java.base/java.nio.ByteBuffer.get(ByteBuffer.java:826) ~[na:na]
at org.apache.pulsar.common.schema.KeyValue.decode(KeyValue.java:137) ~[pulsar-client-api-2.8.2.jar:2.8.2]
at org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl.decode(KeyValueSchemaImpl.java:156) ~[pulsar-client-2.8.2.jar:2.8.2]
at org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl.decode(KeyValueSchemaImpl.java:40) ~[pulsar-client-2.8.2.jar:2.8.2]
When you pass the Schema.KeyValue
parameter to the messageReader
, make sure to set the encoding type to KeyValueEncodingType.SEPARATED
:
reactiveMessageReader = reactivePulsarClient
.messageReader(
Schema.KeyValue(
Schema.AVRO(LiveEvent.Key.class),
Schema.AVRO(LiveEvent.Value.class),
KeyValueEncodingType.SEPARATED)
)
- ClassNotFoundException: org.conscrypt.Conscrypt
2022-07-27 17:13:11.483 WARN 4809 --- [r-client-io-1-1] o.a.pulsar.common.util.SecurityUtility : Conscrypt isn't available. Using JDK default security provider.java.lang.ClassNotFoundException: org.conscrypt.Conscrypt
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
at java.base/java.lang.Class.forName(Class.java:375) ~[na:na]
at org.apache.pulsar.common.util.SecurityUtility.loadConscryptProvider(SecurityUtility.java:122) ~[pulsar-client-2.8.2.jar:2.8.2]
Add a dependency in your build.gradle
file:
dependencies {
...
runtimeOnly 'org.conscrypt:conscrypt-openjdk-uber:2.5.2'
...
}
Thanks to Chris Bartholomew.
Follow the DataStax Tech Blog for more developer stories. Check out our YouTube channel for tutorials, and follow DataStax Developers on Twitter for the latest news about our developer community.
Resources
- Google Cloud Functions
- Astra DB
- CDC for Astra DB
- Cloud Native Data Streaming
- Spring Boot
- Reactive Pulsar
- FaaS Providers: The Top Names on the Market
- Astra Streaming: Real-Time Data Just Got a Lot Easier
- Pulsar Client Libraries
- Developing and Running Serverless Apache Pulsar Functions
- Reactive Applications with Apache Pulsar and Spring Boot
- The Reactive Manifesto
- Reactive Streams Specification for the JVM
- Project Reactor
- Reactive Programming With Spring
- Gradle Avro Plugin
- Reactive Astra demo
- Join our Discord: Fellowship of the (Cassandra) Rings
- DataStax Community Platform
- DataStax Academy
- DataStax Certifications
- DataStax Workshops