Building Analytics Engine Using Akka, Kafka & ElasticSearch

Knoldus Inc.
Knoldus - Technical Insights
6 min readJul 7, 2016

In this blog , I will share my experience on building scalable, distributed and fault-tolerant Analytics engine using Scala, Akka, Play, Kafka and ElasticSearch.

I would like to take you through the journey of building an analytics engine which was primarily used for text analysis. The inputs were structured, unstructured and semi-structured data and we were doing a lot of data crunching using it. The Analytics engine was accessible by the rest-client and web-client(Built In with engine) as shown in below diagram.

architecture-00-b

Here is a quick overview on technology stack :

  1. Play Framework as Rest Server & Web Application (Play is MVC framework based on lightweight, stateless and web friendly architecture.)
  2. Akka cluster as processing engine.(Akka is a toolkit and runtime for building highly concurrent,distributed, and resilient message driven applications on the JVM.)
  3. ClusterClient (It was contributed module) for communication with Akka cluster. It used to run on rest server and send tasks to Akka cluster. It was a horrible decision to use the ClusterClient as it does not maintain a continuous connection with Akka-Cluster resulting in broken connections. For re-establishing the connection, we have to restart JVM on which the client was running.
  4. ElasticSearch as Search engine and data storage (for both raw data & analyzed data)
  5. Kibana as visualization platform.(Kibana is Flexible analytics and visualization platform)
  6. Akka Actor as Import/Export data service from ElasticSearch. Akka Actors are amazing as this service was never broken.
  7. S3 as centralized file storage.
  8. Elastic Load Balancing for load balancing between nodes.
  9. MySQL as meta data storage.

We started with Akka version 2.2.x. and did encounter some serious challenges and our observations on the same:

  1. Cluster client’s broken connection with Akka-cluster : Under heavy loads when the CPU utilization was high, the cluster client would mysteriously lose connection wit the cluster. It was a third party library then and we have to restart the JVMs even in Mid night :( to get it working.
  2. Resource utilization : We have saw rest server’s nodes utilization , it was 2 to 5% CPU usage. It seemed wastage of resources. Amazon EC2 nodes are not cheap.
  3. Latency Issue : Rest servers were running on separate nodes. It introduces latency because for every request from client it used to deserialize it and then serialize it back again to send it to the Akka cluster. Similarly, the response from Akka cluster was first deserialized and serialized to send back to the Client. This whole serialization and deserialization used to often result in timeout exceptions. Moreover we were just using Play for REST endpoints and not a full featured web framework. This was our design flaw, I would say.

For these issues we came up with Second Architecture in which we :

  1. Removed Akka Cluster client.
  2. Play framework was not right choice for rest-service so we replaced Play by Spray. (Spray is light weight Http-Server)
  3. Started rest service on same JVM on which akka cluster node were running instead on new nodes to reduce end-to-end latency.

We came with this architecture:

architecture-02-b

Awesome!!! It was working fine. Life was good again and the team got splendid appraisals :)

After the third month, we got a new requirement in which we needed to integrate data provider(Datasift). It included Streaming data and historical data. For handling this requirement we simply integrated a shiny new service that pulls data from Datasift and send it to analytics cluster.

architecture-03-b-00

Integration part was pretty simple, but now it resulted in an another problem:

  1. Since the above Architecture is a push based model, whenever there was a high volume of streaming and historical data cluster used to become overwhelmed.
  2. We decided to upscale the cluster size to 8 nodes from 4 nodes. When the stream was at a normal speed most of the nodes were sitting idle. Since, we were using Amazon EC2 4x.Large nodes it was quite costly. Hence a new problem of Infrastructure cost.
  3. We decided to use the Amazon auto scaling. We were able to successfully upscale whenever the load increased on cluster but we were not able to scale down. Amazon auto scaling did not yield accurate results for us.
  4. Another issue was that Akka cluster inter node communication used to break whenever the CPU usage was greater than 90 %. (may be we were new to Akka cluster and also Akka cluster was not mature enough in comparison to current version )
  5. The complete processing was lost due to the death of node.

While we were fighting out a solution for this problem, the product needed a new data provider integration. OHH!! (Time for another Beer)

After doing a lot of brainstorming we understood the problem with our current architecture and then we came up with a simple, highly scalable and fault-torrent Third Architecture:

architecture-04-b

In this new architecture, we removed Akka-Cluster and rewrote the analytics engine. It was purely based on Akka Actors, the Rest-Service was also running on same JVM. Rest-Service simply accept request from client, does authentication/authorization and create and send a processing message to Kafka Queue. Each node (Analytics engine) pulls work from Kafka queue, processes it and pulls again. It never gets overwhelmed.

If any node dies, Kafka automatically assign those processing message to another alive node, this means no processing message is lost.

With this Architecture we were able to sustain all the load with Amazon EC2 2X Large instead of Amazon EC2 4X large that we were using earlier. (:-) Lot of money saved : Time for another appraisal)

It is fully pull based architecture. All requests and stream are processed through the Kafka cluster. It never gets overwhelmed as every thing is pull based . The whole system is deployed on 26 EC2 node and we are about to complete two years without a single issue on production.

We also integrated analysis of server logs for performance, security and user activity using Kafka. The Kafka producer pushes logs into Kafka sever and since we already have an import/export service for ElasticSearch, we use the same for pushing these logs in ElasticSearch. We can easily visualize the User Activities using Kibana.

Conclusion:

  1. Akka Actors are best for building highly concurrent, distributed, and resilient applications.
  2. Spray is best for lightweight http-server Now it is available as Akka-http.
  3. Play framework is best for building highly concurrent , Scalable Web-applications, as it built on top of Akka.
  4. ElasticSearch is used best as a search engine, since it is based on Lucene, it would provide full text search. Though we are also using it for data storage but it is not best used for persistence(e.g. in comparison to Cassandra)
  5. Kafka is best for streaming processing and log aggregation. It is scalable and distributed by design and fault-tolerant.

Till I update this post with the Fourth Architecture … , Happy programming !!! Keep innovating

KNOLDUS-advt-sticker

--

--

Knoldus Inc.
Knoldus - Technical Insights

Group of smart Engineers with a Product mindset who partner with your business to drive competitive advantage | www.knoldus.com