Data Processing Using Spring Cloud Data Flow

Naukri Engineering
Naukri Engineering
Published in
8 min readMay 26, 2020

By Tabish Khan

Introduction

The efficient flow of data from one location to the other — from a SaaS application to a data warehouse, for example — is one of the most critical operations in today’s data-driven enterprise.

Let’s suppose we have some independent SpringBoot applications, and want to build a data pipeline by stitching these apps together which can be deployed on top of different runtimes (e.g. CloudFoundry, Kubernetes etc.). Or, we want to perform data ingestion, ETL, real-time data analytics, and data import/export. These are some common use-cases for Spring Cloud Data Flow.

Spring Cloud Data Flow, or SCDF is an open source Spring based project which makes use of other established Java based technologies for building streaming and batch data processing pipelines.

What is a data pipeline?

Data pipelines can be defined as a group of applications interconnected in a manner that they serve a larger purpose, and process some data in a serial fashion.

In the following image, we can see a basic data pipeline, which receives data from an HTTP endpoint (much like an API), transforms into required format by truncating and picking selected fields and dumps data to a MySQL table. Note that all, httpReceiver, transform and jdbc are independent Spring Boot applications.

A basic data pipeline

Architectural Overview

The following image shows the primary components of SCDF. Dataflow Server is responsible for deploying applications to a runtime. It provides an interface for HTTP cURL, Dashboard UI and Shell Client. Also, it provides a deployer SPI, which allows us to deploy Java applications through various application runtimes like Kubernetes, CloudFoundry or even local.

Spring Cloud Data Flow Server

Components of Spring Cloud Data Flow

  1. Applications: Applications are categorized into source, processor and sink. Source app takes data in from a persistent storage or cache or even an HTTP endpoint. A processor app (optional) is open to implementation of any business logic. Similarly, a sink app stores data in some location/endpoint, or exits(sinks) the data from the current pipeline.
  2. Application runtime: To run SCDF, we need an application runtime like Kubernetes, Cloud Foundry, Apache Yarn, Apache Mesos, or a local server.
  3. Messaging middleware: Spring Cloud Data Flow supports two messaging middleware broker engines — Apache Kafka and RabbitMQ — that these Spring Boot apps talk to and get connect via.
  4. Data Flow Server: It is responsible for preparing applications (custom and/or starter apps) and the messaging middleware so that the applications can be deployed on runtime using either the shell or the dashboard or even directly through the REST API. It can be found here.
  5. RDBMS and Redis (optional): Database for storing streams and tasks metadata. If not provided, it uses H2 to temporarily store this data to JVM cache. Redis is used for performing analytics on pipelines.
  6. Maven repository (optional): Applications are located from maven coordinates during runtime. Although apps can also be loaded as a docker image or even standalone JAR.
  7. Data Flow Shell Client (optional): Provides a command line interface for interacting with Data Flow Server. It helps with common functionalities of Data Flow Server like deploying/un-deploying an app (or task) and creating/deploying a pipeline. More details can be found here.

Steps for Installing Spring Cloud Data Flow

  1. Start messaging middleware (RabbitMQ/Kafka).
  2. Download Spring Cloud Data Flow (Ver. 1.2.2) from here, and run the downloaded JAR with DB properties (spring.datasource.url, spring.datasource.username etc.) if you want to persist metadata, messaging middleware properties (spring.rabbitmq.host, spring.rabbitmq.port). Full list of properties can be found here.
  3. Download Spring Cloud Data Flow Shell: You can download a client shell from here, and run the downloaded JAR.
  4. Import starter apps: SCDF provides out-of-the-box apps which can be easily used in pipelines, which perform some basic tasks.
    To import these apps using shell: “app import — url=https://dataflow.spring.io/rabbitmq-maven-latest
    Now, we can show basic apps by command: “app list”
Sample response of app list

5. Create custom stream apps: To make any app compliant with SCDF, we need to add the dependency below,

Also, we need to add the annotation @EnableBinding(Source.class) or Processor or Sink.class accordingly with the impl bean. Also, we need to use annotation @Filter or @ServiceActivator over the implementing method. And we are good to go. Then, we can use “app register” command to register our custom application. More info about this here.

6. Create stream (pipeline): To create stream, we issue the command “stream create”.

For example:

dataflow:>stream create — name myStream — definition “jdbc | fileNew: file”

where, fileNew is the alias of app, since we can have more than one instances of an app in a stream. You can use any no. of apps for creating a stream, but it should have exactly one source and one sink.

7. Deploy stream: Now we can deploy the stream using command “stream deploy <streamName>”.
More about shell client can be found here.

We can also use GUI dashboard instead of SCDF shell to perform above operations. You can read all about it here.

Insights

Spring Cloud Data Flow (or SCDF) provides a layer of abstraction and saves you the hassle of declaring and maintaining queues, exchanges between every two apps, and automatically handles retry mechanism and dead letter queues (by using some basic properties). However, if you want to handle it on your own, you can do that as well by changing defaults and writing your own implementation.

Deployer SPI

Deployer SPI (spring-cloud-deployer) is responsible for deploying the applications in a stream/pipeline for application runtimes. So, we can decide how our applications are going to be deployed, i.e. define memory limits, CPU limits, pass environment variables, liveliness, readiness probe by passing properties in the form of ‘deployer.<app>.<local/kubernetes>.<property>’ along with our apps.

Partitioning Streams

Often we can require that upon some condition, flow of streams divert the flow into two/more sub-streams. This is called partitioning of streams, and is done by directing input of one stream as the queue for the app of another stream. Although, partitioning can also be done by routing key of messaging middleware.

For example, consider these streams :
stream1 : “http | filter — filter.expression=’${1}<18' | file”
stream2 : “:stream1.http > filter — filter.expression=’${1}>18' | file”
Here, source app for stream2 is the queue in between stream1.http and stream1.filter. So the records with ${1}<18 is served by stream1 and those with ${1}>18 by stream2, and finally messages with ${1}==18 are not processed by either of the stream.

Retry mechanism

If a message is failed to publish to a certain queue, it will retry again the number of times defined by the property ‘spring.cloud.stream.bindings.input.consumer.max-attempts’ (dafault :3). Also the initial interval and max interval for backoff before retry can be defined by property ‘spring.cloud.stream.bindings.input.consumer.back-off-initial-interval’ and ‘spring.cloud.stream.bindings.input.consumer.back-off-max-interval’.

Dead Letter Queue (DLQ)

DLQ

A dead letter queue is a queue which contains messages which have exceeded the retries. We have to define dead letter queue for each corresponding application by the property ‘spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq’ which automatically binds DLQ to it. But the actual publish-on-retry-failure to DLQ is done in regards to the property ‘spring.cloud.stream.rabbit.bindings.input.consumer.republishToDlq’

Our Implementation

Custom apps

We tried to use out-of-the-box applications but they lacked certain functionalities that were required, such as usage of logback for centralized and proper logging, adding actuator support to give health check and liveliness check for apps. Also, we needed custom prefetching and dead letter queue logic. So, we decided to write these basic applications on our own. We created apps such as jdbc-source, filter-processor, http-processor, jdbc-processor, mongo-processor, transform-processor, jdbc-sink which served our use-cases. For example, http-processor is responsible for making HTTP requests and optionally, adding HTTP response (or some part of it) to the actual response.

Data related problems

In order to make above apps generic, we felt the need for dynamic rule-evaluators. We tried to use SpEL, but with the increase in complexities in use-cases, the size of properties for each application increased dramatically (since SpEL requires us to write actual Java code). At this point, ScriptEngine came to our rescue. We used ScriptEngine to allow JavaScript syntax rule evaluation, where rules were passed through properties.
For example, in http-processor app, where we need to make an HTTP request with header value “abc” extracted from a JSON payload, from JSON path “abc.def.0.ghi”

Infrastructure

To start using SCDF, we started setting it up as a local deployment initially. The steps defined above are manual. But we needed our SCDF infrastructure to be automatable. So, we leveraged the REST APIs provided by Dataflow server to create cURL requests for deploying apps, deploying / undeploying / destroying streams etc.
So, we created a Git repository for the same which contains app definitions, stream definitions (what apps to use in a stream) along with properties associated with apps of this stream, and scripts which would in turn create the cURL requests. This would also ensure version control of stream definitions. We can also have multiple properties for the same stream definitions as different environments.
So, the init-script is able to start dataflow server (if not started), load pipeline definitions from this Git repo (if not loaded) and deploy these streams.

Monitoring

We also required our pipelines to be monitored at all times. Therefore we included some basic monitoring scripts in Git repository mentioned above, whose responsibility is to monitor RabbitMQ exchanges and send mail alerts when the number of messages in dead letter queue exceeds a fixed threshold. Another script acts like health check and connects to Dataflow API to check whether all pipelines are in deployed state. If it is unable to connect, then, it triggers the init-script.

What’s next

Kubernetes

We are currently working on moving our implementation to Kubernetes as application runtime, so that it can provide us with the functionality of scaling the streams according to load and/or need.

Cloud Tasks

Also, we would explore the usage of Cloud Tasks instead of complete Spring Boot applications for executing short lived microservices, which can be Spring Batch tasks or simply Spring Cloud tasks. Benefit of using these tasks is that they are very lightweight, and also persist their execution history in a database, so upscaling is very easy.

Hit the 👏 (claps) button to make it reachable to more audience.

--

--

Naukri Engineering
Naukri Engineering

Think, Develop, Rollout, Repeat. The world class recruitment platform made with love in India. https://www.naukri.com/