Kappa Architecture : In Practice

Most of us who have been working with big data systems have come across the two most important architecture configurations namely Kappa Architecture and Lambda Architecture. These architecture definitions provide a way to work with real time + historical views of any computations, reports, processing required. Both these architectures use the same set of technologies / frameworks. Both of them can be used to achieve the desired output, depending on the business context and the existing infrastructure.

I work in a startup in the adtech space and was an initial member of the team that was chartered to create a sub system to derive insights from data which would be used to power the products in the adtech space that our company was pivoting into.. As part of the existing systems we were getting Bid Streams from a lot of AdExchanges using Open RTB protocols. The existing business was providing a service to clients, to run marketing campaigns on digital platforms around mobile devices.

The product had the following objectives:

  • To build a SAAS based self serve platform so that clients can use to run their campaigns.
  • Allow clients to create audiences: This means the platform is providing the ability to identify audiences based on the client needs for a campaign, For ex: a client might want to target 26–35 age group, Males, Teachers living in a give suburb of london.
  • Provide realtime insights into the audience they created and campaigns they run to allow them to take quicker decisions about their marketing strategy.

As part of building the various stepping stones for the product we used all the data that we were getting from AdExchanges and put them in internal kafka queues , built ingestion systems to HDFS for analysis on larger time lines, deploy various Apache Storm topology pipelines to filter / enrich / correct / capture/ infer / select based on a criteria about people. There are various other operations like de-duplication of users across exchanges etc that were also done as part of the system we had built. All our Apache storm pipelines were tuned to run in Realtime, which means as we were getting data from exchanges we were able to process them via our Storm pipelines, hence satisfy the incoming QPS of about 20000 and a complete process latency across each pipline varyingy between 40–700 milliseconds per request.

Since the clients to join / leave anytime and it was a self serve platform we had to automate every aspect of the whole process. This meant providing a *

  • Audience Selection Query Language (rule)
  • A framework to process these queries as soon as the clients created them
  • Ability to store insights at resonable granularity
  • Ability to aggregate the above insights at runtime with sub second latency.

Insights are basically counters across various dimensions and based on the granularity we should be able to sum them up quickly.

So there was essentially nothing that we needed from historical data apart from the counters till now, but that changed as clients started requesting that the existing audience inventory be processed through the new selection criteria that had been defined by them now, along with running every new user against the same rule. This meant we had to select all the existing audiences for the past 30 / 60 / 90 days and re run them against the new rule.

Since we already had an existing system that was tagging users who matched against client rules, we had two choices,

Build a batch system that would read data from HDFS and run them against the rule defined by a client ( lambda architecture style )

OR

Not build a separate batch system but just re run the existing data through the pipelines very quickly (kappa architecture style)

For us the following factors helped us decide which way we wanted to go:

  • The infrastructure required to run the historical data is not order of magnitude higher than existing real time system. Since there are certain throughput and latency gurantees that a realtime system must meet, the kind of infrastructure that its run on is going to be different than what a hadoop cluster might need. And now if we want to run 30 or more days historical data data against the same pipelines the infrastructure cost of doing that should not be linear.
  • We are able to define parameters within the business, which we could use to identify useful data on a historical timeline. I will explain a bit more with our use case here: the clients wanted to rerun the profiles of users we had seen in the last 30 or more days against the rule they had just created. Hence we should be able to in someway, identify these users on a time line, based on when they were last seen.
  • Since we are running everything on the same kind of infrastructure and we have automated the historical replay of data, we will now be able to complete the processing, at a rate faster than it would have been possible as a batch system, this should provide the product additional capabilities that the current products / providers in market do not have.
  • Since we are heavily constrained by time, we would not be able to build a batch system that will have the same functionality as the real time system with in the required timelines. This would also require that the batch system integrate to the serving layer ( redis + front end ) to show correct analytics to the clients.

Tech Details

Some of this i have mentioned before:

  • Apache Kafka: Used for holding data in transition from both external systems like Ad exchanges and internal systems.
  • Apache Storm: Real time processing framework
  • Elastic Search: Rule processing framework.
  • Cassandra : Our data store
  • Redis: store for counters
  • Akka: Library to provide us a fast replay of historical data.

The incoming data to us is BidStreams. This has varying information like:

  • some attributes about the device, carrier etc.
  • some information about the user
  • Detail information about the Ad space on the device, how much space is available in height X width etc.

As you can deduct we can get BidStreams form the same users multiple times within a hour / day /week etc. To allow us to run our replay we had to first be able to store unique users in a time line such that if a user is seen multiple times in a given timeline we only sotre the latest instance we saw them. We achieved this by using Log Compaction in Kafka. The message for the topic contained key: user_id and value: {user_id, time}. Every time we get a bid request we would push a message to the above topic with the user_id and the time we saw. This would allow us to maintain an ordered sequence of most recently seen users. Now for our replay of say 30 days, we can start reading from the beginning of the topic and reject users older that 30 days and a message to another topic which is read by Storm pipelines to run the client rule against the user.

Our rules are defined in Elastic search as percolator queries against which we throw user documents containing relevant information to find out which rules match. Since Percolation queries can themselves be selected using a criteria, we have the ability to run a user against all rules defined by all clients, or, run the user against a specific percolation query. This only requires creating the document which is sent to Elastic search with some variations.