Building a scalable reactive ingestion pipeline to ingest data in a Graph DB

Dhananjay Ghanwat
Dec 13, 2019 · 9 min read
Photo by Marc Wieland on Unsplash

Abstract

In this article, we will be building a highly scalable reactive pipeline to ingest data from a file data store (like CSV, fixed-width, JSON, etc) to a graph data store (like Neo4j, Tinkergraph etc) using Akka Streams and Actors. We further make it event-driven and serverless using AWS Lambda. We will be using Gremlin to abstract our graph data store which gives us the flexibility of choosing data store which fits requirements best.

Overview

Let us briefly introduce the pieces of the domain which we will stitch together by end of this article to complete an end to end, event-driven, reactive ingestion pipeline into a graph datastore

Data Ingestion

Systems today gather a large amount of data in an effort to use that data to discover real-time or near real-time insights that inform decision making and support digital transformation. Data ingestion is the process of importing, transforming, processing and persisting this data in some kind of database for later use. Data can be ingested in different ways, like batch, real-time or streaming. Batch data processing is very a common way of processing large volumes of data, collected over a period of business function. With technologies like https://doc.akka.io/docs/akka/current/stream/index.html Akka Streams and Actors, it is possible to process these large files as streams of data in order to get benefits like parallel processing, non-blocking I/O and others. Streaming helps us to load, validate, transform and persist data in a quick, declarative and reactive manner

Graph Datastore

In the last few years because of their increased efficiency over traditional databases, Graph databases have seen massive adoption in modern solutions for e-commerce, transportation, social networking business domains. A graph database, as the name suggests organizes data in the form of objects referred to Nodes and some sort of relationships between them as Edges

Event-Driven using AWS Lambda and S3

Event-driven architectures allow for services to be loosely coupled to each other. Each system can be responsible for its domain and emit events when actions occur without ever caring about how those events are being used. Meanwhile, other systems can react to those events to execute logic in their own domains.

AWS ecosystem allows us to enable on an S3 bucket when a new file is added/updated/removed etc. We can also have fine-grained rules which can trigger events based on metadata of the file, like say an extension of the file. These events can be delivered to other services like SNS, SQS, and AWS Lambda.

That’s all for introductions, let us get cracking !!!

Implementing an ingestion service for Media Company

During rest of this article, we will implement an ingestion service for a newly started media company which wants to store all the movies which are watched by the users, so that it can use further to do to some recommendation analysis on it. For simplicity of this article let us limit the service to one time read of a file on an S3 bucket and transform that data set into a graph model using Lambda.

AWS Lambda using Serverless

Serverless computing is a cloud computing model which aims to abstract infrastructure management, provide an inherently scalable design with a pay-per-use model. This helps provide availability and fault tolerance which helps developers focus on writing business logic and not worry about other things. Lambda is a serverless computing offering from AWS and I have found it very efficient to deploy business solutions using Lambda. Lambda function is an independent unit of executable code which can be triggered by various events like a file being created on S3.

We use the Serverless Framework to create and manage our Lambda function. The serverless framework helps us to develop and deploy Lambda functions, along with managing AWS infrastructure resources enabling us to focus on code and not worry about infrastructure.

npm install -g serverlessserverless create --template aws-scala-sbt --path MoviesIngestionService

If all goes well you will get a response like below

Serverless: Generating boilerplate...
Serverless: Generating boilerplate in "path\to\service\MoviesIngestionService"
_______ __
| _ .-----.----.--.--.-----.----| .-----.-----.-----.
| |___| -__| _| | | -__| _| | -__|__ --|__ --|
|____ |_____|__| \___/|_____|__| |__|_____|_____|_____|
| | | The Serverless Application Framework
| | serverless.com, v1.53.0
-------'

Serverless: Successfully generated boilerplate for template: "aws-scala-sbt"

This creates serverless.yml file and your hander file. serverless.yml contains the configuration of our serverless code which defines the provider where the service is deployed, function details, any custom plugins used, events which trigger each function, resources required by the functions etc.

The Handler.scala file contains our lambda function code for the project. Each function defined in serverless.yml will point to a function in this handler. We are going to write all our code in this file inside a function which is going to be invoked.

Add s3 plugin to our serverless configuration as we are going to trigger our Lambda function via S3 events

npm install serverless-plugin-existing-s3

We can now integrate our S3 events into triggering Lambda function by adding the following event in your handler section of serverless.ymlfile

functions:
users:
handler: Handler
events:
- s3:
bucket: user-movies
event: s3:ObjectCreated:*
rules:
- prefix: uploads/
- suffix: .csv

The above configuration can be read as below

trigger Handler when an object is created in S3 bucket user-movies with path uploads having extension csv

We are now ready to deploy this empty handler using AWS credentials. Details can be found here.

serverless config credentials --provider aws --key AWS_ACCESS_KEY --secret AWS_SECRET

Akka Streams

We will be using Akka Streams to process the file in S3 as a stream. Akka Streams is an open-source library to process and transfer a sequence of elements using bounded buffer space. In simple words, it means we can express processing of entities as a chain allowing each entity to be processed independently. If you are new to the world of Akka Streams I recommend you reading this article

[A Journey into Reactive Streams] https://blog.redelastic.com/a-journey-into-reactive-streams-5ee2a9cd7e29 A Journey into Reactive Streams

and

[Getting Started with Akka Streams] https://stackoverflow.com/questions/35120082/how-to-get-started-with-akka-streams

Some terminology of Akka Streams

Source : Entry point for process, all our entities will be emitted from this source.

Flows: Processing block of code. We can more than one flow which can be used for our processing. A Flow has one input and one output

Sink: it is a terminal operation that triggers all computations in the entire Flow

Our ingestion pipeline will be created using below flow

Streaming from S3

We will be using Alpakka AWS S3 connector (https://doc.akka.io/docs/alpakka/current/s3.html) . The AWS S3 connector provides Akka Stream sources and sinks to connect to Amazon S3.

After integrating with S3, our Handler.scala looks like

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

def handleRequest(event: S3Event, context: Context): String = {
logger.trace(s"Received a request: $event")
val bucket = event.getRecords.get(0).getS3.getBucket.getName
val bucketKey = event.getRecords.get(0).getS3.getObject.getKey
logger.trace(s"Reading from Bucket ${bucket} with file name ${bucketKey}")
try {
val s3File: Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed] =
S3.download(bucket, bucketKey)
s3File.runWith(Sink.head).map(f => {
f.map(s => {
val dataSource = s._1
val metadata = s._2
new Ingestor().ingestS3(s._1)
})
})
} catch {
case e:Throwable => logger.error(s"Some error in reading S3 ${e}",e)
}
""
}

To be consistent with the Single Responsibility Design pattern, we introduce a new Ingestor class to handle all our Ingestion processing.

Defining our Flows and Sink

We have our Source ready, next piece to implement is our Flow.

Using Flow to parse our Input

Let’s create a parseLine function that will parse a line from CSV file and transform it into our model UserMovies

case class UserMovies(userId: String, userName: String, movieId: String, movieName: String)def parseLine(line: String): Option[UserMovies] = {
val cols = line.split(",").map(_.stripPrefix("\"").stripSuffix("\"")).map(_.trim)
try {
Some(new UserMovies(cols(1), cols(2), cols(3), cols(4))
} catch {
case e: Throwable => {
logger.error(s"Error in converting ${e}")
None
}
}
}

We create our Flow which will apply parseLine to our input to create a Flow with input type String and output type Option[UserMovies]

def parseContent(): Flow[String, Option[UserMovies], NotUsed] = {
Flow[String].map(parseLine)
}

When we call the parseLine function, the compiler knows that the argument to that lambda function will be a String – same as the input type to our Flow.

Adding Sink to our Flow

To start the execution of Flow we need to define. In our case, the sink operation will invoke Akka Actors to persist the model in the graph data store.

def saveModel(model: Seq[Option[UserMovies]]): Future[Seq[Option[String]]] = {
(userMoviesActor ? ParsedUserMovies(model)).mapTo[Seq[Option[String]]]
}

Our saveModel functions accepts a List , so that we can persist the model in batches for performance optimization.

We create Sink from the Flow with Sink.ignore() as a first argument and Keep.right() as the second because we want to return a status of the processing:

def storeContent() = {
Flow[Seq[Option[UserMovies]]].map(saveModel).toMat(Sink.ignore)(Keep.right)
}

Connecting Source to Sink via Flow

With all the pieces implement we are now ready to connect them together to form a pipeline.

source.via(Framing.delimiter(ByteString("\n"), 256, false).map(_.utf8String))
.via(parseContent)
.grouped(10)
.runWith(storeContent).onComplete {
case _ => {
system.terminate()
}
}

Note: we are collecting the output of parseContent flow in a group of 10 so that we can persist the data in batches of 10

Actor Model

By using Akka Actors we can efficiently write asynchronous code without the need for locks and synchronization. This allows us to enable parallel processing. Actors can also be distributed thus enabling scope for scaling up or down as per load.

As actors use messages to communicate the sender thread won’t block to wait for a return value when it sends a message to another actor. The receiving actor will respond with the result by sending a reply message to the sender.

Another advantage of using Actors is that we don’t have to worry about synchronization in a multi-threaded environment. Akka guarantees sequential processing of messages.

We will be using Akka Actors to handle our processing to persist our model to the graph data store. (https://doc.akka.io/docs/akka/current/actors.html)

Implementing Actor

object UserMoviesActor {
case class ParsedUserMovies(userMovies: Seq[Option[UserMovies]])
}

class UserMoviesActor(graphDb: ScalaGraph)
extends Actor with LazyLogging with UserMoviesOps {
implicit val timeout = Timeout(1.minutes)
def graph= graphDb
def receive: Receive = {
case p:ParsedUserMovies => {
logger.trace(s"Actor received message ${p}")
sender() ! persistUserMovies(p)
}
case _ => {
logger.error(s"Unknown message received by the Actor")
}
}
}

We would like to keep our classes small and concentrate on doing only one thing, we separate Actor with Actor Operations. We have a trait UserMoviesOps which implements operations for our Actor

Gremlin

Gremlin is the graph traversal language of Apache TinkerPop. It is a functional, data-flow language that enables users to succinctly express complex traversals on (or queries of) their application’s property graph.

We can use TinkerPop as an abstraction for our underlying data store. You can read more about it here http://tinkerpop.apache.org/docs/current/tutorials/getting-started/

Further, we use this library https://github.com/mpollmeier/gremlin-scala as a Scala wrapper over Apache TinkerPop. The library has a lot of example for different providers. You can refer it here https://github.com/mpollmeier/gremlin-scala-examples

Persisting Model

def persistUserMovies(p: ParsedUserMovies): Future[Seq[Option[String]]] = {
Future {
logger.trace(s"Storing Model ${p}")
val g = graph.traversal
val insertVertexIds = for {
s <- p.userMovies
} yield {
s match {
case Some(userMovie) => {
try {
val user = g.addV(label="User").property(Name, userMovie.userName)
.property(Id, userMovie.userId).head()
val movie = g.addV(label="Movie").property(Name, userMovie.movieName)
.property(Id, userMovie.movieId).head()
val relationShip = g.V(user.id()).as(a).V(movie.id()).as(b).addE(Watched).from(a).to(b)
Some(relationShip.id.toString)
} catch {
case e: Throwable => {
logger.error(s"Error in storing ${e}")
None
}
}
}
case None => {
None
}
}
}
try {
graph.tx().commit()
} catch {
case e: Throwable => {
logger.error("Error in committing the transaction. mostly because this graph does " +
"not support transactions", e)
}
}
insertVertexIds
}

Conclusion

In this article, we implemented an end to end ingestion pipeline. We explained how we can use AWS S3 in conjunction with AWS Lambda for event-driven architecture. We showed how we can use Akka consume a file from S3 as a stream and process the file as Source via Flows to Sink.

We also implemented an Actor Model to persist data model making our application scalable, non-blocking and asynchronous.

Finally, we used Gremlin, which gives a nice abstraction over various providers to transform our original CVS file into a Graph

Further Reading

Akka — https://akka.io/

Akka Actors — https://doc.akka.io/docs/akka/current/actors.html

AWS Lambda — https://aws.amazon.com/lambda/

S3 Bucket — https://aws.amazon.com/s3/

Gremlin — http://tinkerpop.apache.org/docs/current/tutorials/getting-started/

Gremlin Scala Library — https://github.com/mpollmeier/gremlin-scala-examples

Akka S3 Scala Library — https://doc.akka.io/docs/alpakka/current/s3.html

Serverless — https://serverless.com/

Dhananjay Ghanwat

https://www.linkedin.com/in/dhananjay-ghanwat-3321b6ab/

Dhananjay Ghanwat

Written by

I am a solution architect, technology leader & full-stack engineer whose passion lies in building great products.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade