Building and Deploying Your First Cloudflow Application

using Scala and Akka Streams

Jeroen Rosenberg
Dec 10, 2019 · 13 min read

Cloudflow is a relatively new framework that helps you build distributed streaming applications and deploy them on Kubernetes. Its powerful abstractions allow you to easily split your application into independent stream processing components, called streamlets. Streamlets can be developed using several runtimes such as Akka Streams and Flink. A streamlet can have one or more input streams, inlets, and one or more output streams, outlets. You deploy you application as a whole while Cloudflow deploys streamlets individually. Cloudflow ensures data flows between the inlets and outlets of your streamlets at runtime, through Kafka, corresponding to your Avro schemas and your pipeline definition, called a blueprint.

In this post we are going to build a simple application using the Cloudflow framework and Akka Streams. We are going to start off with a simple SBT project to setup a stream processing pipeline, run it locally and eventually deploy it to GKE using the Cloudflow CLI.

Prerequisites

First a few prerequisites.

For building and running locally:

  • Scala 2.12+
  • Sbt 1.2.8+
  • JDK 11

For deploying to GKE:

  • Kubectl
  • Google Cloud SDK
  • A GCP project with GKE
  • jq
  • Helm 2 (installer is not compatible with Helm 3 at time of writing)

Building a Cloudflow application with Akka Streams

For our fictive problem domain we are going to stay in the clouds. We are going to build a simple rain radar. Let’s assume we are getting precipitation data of several locations from different sources. Based on a simple algorithm we want to determine in which cities it’s raining and which of our measurements can be classified as clutter. We could later extend this application to calculate rain intervals or even draw maps, but for the purpose of this post let’s keep things simple.

Following the convention over configuration principle, the anatomy of a Cloudflow application roughly looks like this:

|-project
|---cloudflow-plugins.sbt # cloudflow plugin config
|-src
|---main
|-----avro
|-----blueprint
|-------blueprint.conf # pipeline definition lives here
|-----resources
|-----scala
|-build.sbt
|-target-env.sbt # docker repository config

First of all, we would need to add the sbt-cloudflow plugin to help us package our streamlets into deployables later on. We’ll define the plugin in the file project/cloudflow-plugins.sbt.

project/cloudflow-plugins.sbt

In line 5 of our build.sbt we are enabling the CloudflowAkkaStreamsApplicationPlugin, because we are only going to use the Akka Streams runtime. You can optionally add other plugins as well if you plan to also write Spark or Flink streamlets. We are going to use Akka Http Spray JSON for serialization.

Now that we have our project setup we can start developing. First we will create our data model. Cloudflow encourages a schema-first approach where you describe your models using Avro specifications, placed in src/main/avro, and the corresponding classes will be generated.

Let’s start with our top level input record which represents a precipitation measurement on a certain time for a certain location. We’ll call this model PrecipitationData from now on. Its Avro schema looks like this:

src/main/avro/PrecipitationData.avsc

As mentioned, our Avro schemas should be placed in src/main/avro so we store our first schema in src/main/avro/PrecipitationData.avsc. Our PrecipitationData model embeds a Location, so we have to define the schema for that as well:

src/main/avro/Location.avsc

This completes the data model of our input records. As for the output we have two possibilities. In our simplified model we are either dealing with Rain or we are dealing with Clutter. Let’s define schema’s for both:

src/main/avro/Rain.avsc

and

src/main/avro/Clutter.avsc

Now we have both our input and output records defined. The result of te built in code generation creates the corresponding Scala classes in target/scala-2.12/src_managed/main/compiled_avro/com/github/jeroenr/rain/radar.

Now that we have our data model defined we can start creating some streamlets. Let’s start with the data ingestion part. For the purpose of this example we are going to assume that we have some job that will Http POST precipitation data to our service. Hence, we need to setup an Http Ingress streamlet which will be responsible for ingesting, validating and parsing this input into our PrecipitationData model. Let’s also configure a single outlet so we can pipe our data to our downstream streamlets.

src/main/scala/com/github/jeroenr/rain/radar/PrecipitationDataHttpIngress.scala

As you can see the code is very minimal. We are extending the abstract class AkkaServerStreamlet, which ensures this streamlet gets an endpoint in Kubernetes.

We define an AvroOutlet[PrecipitationData] which will output any successfully parsed incoming message to the "out" outlet. By partitioning by city we ensure that we could potentially process multiple cities in parallel in streamlets connected to our outlet. Defining a partitioner ensures values with the same city end up in the same partition on Kafka.

Every streamlet needs to define a StreamletShape, which configures the inlets and outlets. For our ingress we just have a single outlet ("out") and no inlets, since we are ingesting input over Http rather than from any upstream streamlet.

The last step is to define the logic of the streamlet. For our ingress we can simply use the built-in default HttpServiceLogic. We pass the outlet we defined above and let Cloudflow handle the rest.

Our ingress needs to know how to deserialize our models. For this purpose we will define a JsonFormat for our models using spray-json. We’ll have to define a custom format to deal with the timestamp field, which uses the Instant type, but we can use the built-in jsonFormat* for the generated case classes:

src/main/scala/com/github/jeroenr/rain/radar/JsonFormats.scala

Now that we are able to ingest PrecipitationData and make it available in our pipeline we would like to differentiate the cases where it’s dry, raining or we are dealing with clutter. We want to feed measurements that we classify as rain to a different processor than measurements we classify as clutter. We basically want to partition our stream into a stream of Rain and a stream of Clutter. In case we didn’t measure any value we assume it’s dry and disregard the particular PrecipitationData.

For this type of streamlet we are using the AkkaStreamlet which is the fundament to build Akka Streams based streamlets. Like with our ingress streamlet we start off by defining our in- and outlets. An inlet for PrecipitationData called "in", an outlet called "clutter" for Clutter and an outlet called "rain" for Rain. For rain we still want to partition on city again in order to potentially parallelise the processing downstream, but for clutter we don’t really care about this and we use a simple RoundRobinPartitioner.

Then we define the StreamletShape again. In this case we have a single inlet and two outlets, for clutter respectively rain.

Lastly we have to implement this streamlet’s logic. Luckily, there’s a SplitterLogic abstraction available which does exactly what we need; it splits a stream over two outlets based on Scala’s Either. The implementation of this abstract class requires us to implement a Flow[PrecipitationData, Either[Clutter, Rain]]. We can use flowWithOffsetContext to acknowledge that we’ve processed a PrecipitationData. When we successfully output this construct automatically commits the offset (checkout this explanation of offsets if you’re not sure what I’m referring to) using at-least-once semantics. The implementation of the flow is straightforward. First, we filter out the PrecipitationData where it’s dry. Then, depending on whether we exceed the “rain threshold” (0.1 in this case), we map to Left(Clutter) or Right(Rain).

src/main/scala/com/github/jeroenr/rain/radar/RainClutterPartitioner.scala

Also here the required code is quite minimal. We can focus on implementing the business logic, rely on some very basic operators (e.g. filter and map) and let Cloudflow do the heavy lifting.

The final piece of our pipeline are two simple logger streamlets. We have a stream of Rain and a stream of Clutter. In a real application you might want to store those measurements in a database, but for the purpose of this post we would like to simply print out each element on both of the streams. This “logging” functionality is pretty generic. Whether we are dealing with Rain, with Clutter or with any other record the implementation is really similar. Hence, it seems like a good idea to create a LoggerStreamlet abstraction.

We base the LoggerStreamlet on the AkkaStreamlet once again, so that we can benefit from the Akka Streams semantics. Our streamlet has one inlet for record type T. As long as we know that this is some type of Avro record, we know how to deal with it. We don’t have to define any outlets, because our logger is the end of our pipeline. The StreamletShape therefore has just one inlet and no outlets.

For the logic of this streamlet we will use the RunnableGraphStreamletLogic building block. This requires us to implement the runnableGraph function which should define how data should flow from the source, our inlet, to the sink. This is something that the SplitterLogic building block, which we used in the previous step, is doing under the hood. Since we have no outlets we use the parameterless sinkWithOffsetContext overload. This ensures we still commit offsets even though we are not emitting any elements to an outlet downstream. To define the source we simply pass our inlet to the sourceWithOffsetContext function. The Flow in between our source and our sink would be a simple call to our logger on each element. Here, we can use the map operator again.

src/main/scala/com/github/jeroenr/rain/radar/RainLogger.scala

Based on our LoggerStreamlet abstraction we can create two simple implementations: a RainLogger and a ClutterLogger with corresponding log templates.

Wonderful, we have build all the required streamlets for our stream processing pipeline. The only thing left is to connect all inlets to outlets in a pipeline definition or blueprint, in Cloudflow terminology. This blueprint is defined in a blueprint.conf file, which lives under the src/main/blueprint directory as we have seen in the project structure.

First, we define all our streamlets under a short alias:

  1. PrecipitationDataHttpIngress as http-ingress
  2. RainClutterPartitioner as partitioner
  3. RainLogger as rain-logger
  4. ClutterLogger as clutter-logger

Secondly, we define the connections between the inlets and outlets of our streamlets. You could connect a single outlet to multiple downstream inlets, but in this case we don’t need that. Our simple pipeline looks like this:

http-ingress → partitioner → rain-logger
→ clutter-logger
src/main/blueprint/blueprint.conf

This straightforward definition is enough for Cloudflow to understand how data should flow through our pipeline at runtime.

Testing our app locally

To run our app locally we can simply hit sbt runLocal. This will first of all verify our application blueprint. It will check if all inlets and outlets are connected. If not we would get an error similar to:

So we fail early without having to find out this problem after deploying to GKE, quite cool!

Once you’re blueprint is successfully verified we are getting a message indicating all our streamlets have spun up, like:

It tells us that we are expecting precipitation data ingress on HTTP port 3001 and where we can find our log output.

Great, we have our first Cloudflow application running locally. Let’s throw some data at it to see if it generates the log statements we expect.

To generate a bunch of JSON input records I used the excellent tool provided by http://json-generator.com. I used the following template:

[
'{{repeat(10, 20)}}',
{
timestamp: '{{integer(1574000000000, 1574973230815)}}',
location: {
lat: '{{floating(-90.000001, 90)}}',
lng: '{{floating(-180.000001, 180)}}',
city: '{{city()}}'
},
value: '{{floating(0, 1)}}'
}
]

The above template generates between 10 and 20 precipitation data records which I store in precipitation-data.json. Now we can make it rain, e.g. by using curl and jq.

for str in $(cat precipitation-data.json | jq -c '.[]')
do
echo "Using $str"
curl -i -X POST http://localhost:3001 -H "Content-Type: application/json" --data "$str"
done

You should see output similar to the one below

Output from curl command indicating successful processing by ingress through Http 202 status
Output from curl command indicating successful processing by ingress through Http 202 status

This means we have successfully hit our Http ingress streamlet and it was able to parse our requests. Otherwise, we would have gotten a 400 Bad Request. The pipeline output log file, which location you can find at the bottom of the output of the sbt runLocal command (should be in a path starting with /var/folders/ss/), should contain the log statements written by RainLogger and ClutterLogger. If we scroll to the end of this log file we would see output similar to the one below

Output of rain and clutter logger indicating the amount of precipitation detected
Output of rain and clutter logger indicating the amount of precipitation detected

Deploying Cloudflow

Now that we have verified that our new stream processing application can run fine on our local machine we can deploy it. I’ll show you how to do this for GKE.

Make sure you meet the criteria for deployment to GKE mentioned in the prerequisites section. Verify that you have an active GCP configuration by running gcloud config configurations list.

Follow these steps to setup a new GKE cluster and install Cloudflow:

  1. install the Cloudflow kubectl integration. Run kubectl cloudflow version to check whether you’ve installed it correctly.
  2. Clone the Cloudflow repo to your local machine and navigate to the installer directory.
  3. Run ./create-cluster-gke.sh my-cloudflow-cluster to create the cluster
  4. Wait for the cluster to be deployed and fully running and responsive (this could take a while, because sometimes auto updates are triggered as well). Use the GCP console to monitor the progress.
  5. Run ./install-gke.sh my-cloudflow-cluster to install the cloudflow operators. These include kafka, zookeeper, flink and spark. Follow the steps of the installer, i.e. selecting storage classes of your choice for each of the stateful operators. If you are getting connection errors during the installation it probably means your cluster wasn’t fully ready yet. You can rerun the installer once it is in this case.
  6. Verify that you have access to the cluster by running gcloud container clusters get-credentials my-cloudflow-cluster and kubectl cloudflow list. The latter command should return an empty list, because we didn’t deploy our application yet.

Now that our environment is ready we can proceed with the deployment of our rain-radar application.

Follow the following steps to publish and deploy your application:

  1. Verify that you have access to GCR by running gcloud auth configure-docker. It should say something like: gcloud credential helpers already registered correctly..
  2. As we’ve seen in the project setup section you should create a file called target-env.sbt in the root of your project (i.e. on the same level as build.sbt). The content of that file should be:
    ThisBuild / cloudflowDockerRegistry := Some("eu.gcr.io")
    ThisBuild / cloudflowDockerRepository := Some("my-gke-project-123456")
  3. In your terminal, navigate to the root of your project and run sbt buildAndPublish.

4. The output of the previous command tells you how to deploy your application. You just need to append -u oauth2accessToken -p "$(gcloud auth print-access-token)" to authenticate. The full command will be something like kubectl-cloudflow deploy eu.gcr.io/my-gke-project-123456/rain-radar:8–0bd6086 -u oauth2accessToken -p "$(gcloud auth print-access-token)".

5. Awesome! That’s really all there is to it and I didn’t have to touch a single YAML file. Monitor the progress of the deployment using kubectl cloudflow status rain-radar. Once fully running the output should be similar to

You can see that the deployment created a dedicated namespace with the name of our application: rain-radar. All our streamlets are deployed as separate pods. Sweet!

If we want to scale (parts of) our application, we can simply use the kubectl cloudflow scale command. For instance, if we want to run multiple instances of our ingress we run: kubectl cloudflow scale rain-radar http-ingress 2. Really cool :)

Let’s repeat the test we performed on our local machine, but this time we will hit our deployed application on GKE. To do this in an easy way we set up a port forwarding and then we can run the same curl script.

  1. First, we have to figure out the port of our Http ingress. We can use the streamlet-name label to find it:
    export INGRESS_PORT=$(kubectl -n rain-radar get po -lcom.lightbend.cloudflow/streamlet-name=http-ingress -o jsonpath="{.items[0].spec.containers[0].ports[0].containerPort}").
  2. Then we setup the port-forwarding by running
    kubectl -n rain-radar port-forward \
    $(kubectl -n rain-radar get po -lcom.lightbend.cloudflow/streamlet-name=http-ingress -o jsonpath=”{.items[0].metadata.name}”) \
    3001:$INGRESS_PORT
    .
  3. Lastly, we run our curl script again:
    for str in $(cat precipitation-data.json | jq -c ‘.[]’)
    do
    echo "Using $str"
    curl -i -X POST http://localhost:3001 -H "Content-Type: application/json" — data "$str"
    done
  4. Let’s verify that our rain logger streamlet logged the detected rain by running kubectl -n rain-radar logs -l com.lightbend.cloudflow/streamlet-name=rain-logger

Using kubectl -n rain-radar logs -l com.lightbend.cloudflow/streamlet-name=clutter-logger you could verify that clutter has being logged too.

Conclusion

Cloudflow provides very useful building blocks to quickly design and implement a stream processing pipeline. If you are familiar with Akka Streams, Flink and/or Spark you should be able to implement your streamlets without too much trouble. Once you are satisfied with your implementation and verified your blueprint locally, it is literally a breeze to deploy it to a cloud environment like GKE.

Thanks to the use of Avro schemas for our data models and having Kafka as a broker between the different streamlets, we don’t have to fear for data loss or incompatibilities. The Cloudflow framework ensures the necessary topics are created and commits are performed with at-least-once semantics. We didn’t have to worry about this at all.

I hope that with this post I inspired you to start playing around with this amazing piece of software. All sources are available on my github. Thanks for reading!

Jeroen Rosenberg

Dev of the Ops. Founder of Amsterdam.scala. Passionate about Agile, Continuous Delivery. Proud father of three.

Jeroen Rosenberg

Written by

Dev of the Ops. Founder of Amsterdam.scala. Passionate about Agile, Continuous Delivery. Proud father of three.

Jeroen Rosenberg

Dev of the Ops. Founder of Amsterdam.scala. Passionate about Agile, Continuous Delivery. Proud father of three.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade