Data Distributor — The Journey of Data from Zeotap to the Advertising World

Zeotap is a Customer Intelligence Platform (CIP) that helps companies better understand their customers and predict behaviors, to invest in more meaningful experiences. We enable brands to build on a nucleus of first-party data to win new customers and grow their loyal base. In order to do that, these users — in forms of ad ids/cookies — need to be passed to different data management platforms (DMPs) or demand side platforms (DSPs). This begins the journey of a distributor platform.

Introduction

In this article, we will mainly focus on the journey of our distributor platform. Moving from legacy apache spark and python setup to akka actors with the performance improvement of 30–40x while saving around 35 percent in infrastructure cost. We will briefly talk about the problems we faced, the different approaches we tried and how the akka actor system helps us in establishing a performance oriented distribution system.

The Thinking Phase

The journey starts with users creating a segment. A segment is nothing but a list of attributes or behaviours. After a traversal of the Zeotap ID graph, the data with the matching criteria of a segment is stored in some S3/GCS path. From there on the distributor platform needs to send the data to different DMPs and DSPs (for easier reference, we will call them channels from now on). The channels vary depending on following factors:

Challenges to egress the data:

  • Transformation: The data from our ecosystem needs to be modified in the different modes acc. To channel requirements like Protobuf objects for google, CSV files for Appnexus, Json for Lotame etc.
  • Modes of Transport: The ability to handle different modes to upload the data to the channel.

Api calls to the channel server: For Api based integrations, the huge number of api calls, the status record of each one and throttling acc. to channel specifications.

Cloud storage like S3/GCP/Azure: Transfer to different cloud storage with daily/monthly data limitation.

Server to server transfer (Sftp/Scp): Transfer to different servers over a secure network.

  • File Specific requirements: Some file level changes like Renaming, Splitting, Merging, Compressing etc.
  • Other Channel Related Requirements: Encryption, Hashing, Authentication etc.

Tech Stack: Based on Zeotap business requirements and Spark out of box capabilities, we decided to go ahead with Spark for transformation jobs and use python as a language to do the remaining tasks.

Problem : In the early days of business, the data at Zeotap was less, the segments created on top of the data were only in a few thousand and the channels where we needed to push the data were also few in number. So, the spark jobs for transformations and doing the other channel related tasks in python worked perfectly well for us.

But as we grew, the system built keeping in mind the early requirements started failing.

  1. Data in the system increased exponentially (thanks to all the data deals) and the creation of segments was outsourced to clients, leading to more and more segments. As Spark uses most of the RAM for transformations, the parallel execution of multiple data intensive jobs started resource contention over RAM which in result choking the whole system leads to failures and high turn-around time.
  2. Customers work with specific channels, hence expanding our integrations with different channels became a priority. This means dealing with newer data formats, modes of transport etc. The system wasn’t extensible and each integration led to an increase in code complexity. This resulted in increased SLAs for integrations and production debugging.

Horizontal and Vertical Scaling: In order to support all this and run business as usual, we started introducing more and more machines to handle the requests. We began throttling down the number of requests and updated the existing 16 core machines to 40 cores, so that at least we can push as much data we can.

Debugging the problems became a headache and we started putting more and more engineering hours into it. Even with all of these efforts, we missed a lot of SLAs that hampered the revenues and efficiency of the system.

Goals of the System:

Thanks to all the problems we faced and explained above, we now have certain goals to build the system.

  1. Increase in the number of segments leads to more number of distribution requests, hence the system should be scalable.
  2. The channel integration is of different types and hence should be extensible.
  3. Performance is a major requirement and we shouldn’t miss the SLA’s at any cost which we have communicated to our customers.
  4. Use as few resources as required and save as much as you can.

The Experimentation Phase

After carefully identifying the problem, we thought of trying two different approaches.

Go with Goroutines and channels

We tried uploading the data using goroutines and channels. The important points we observed are:

  1. The goroutines are light weight and they don’t consume much memory.
  2. Around 4x improvement with the existing python version. It took 40 minutes to complete 65 million ids with 676 go routines for api based channel whereas the python uploads took 2 hours 30 minutes for the same on a 8 core machine.
  3. While reading from s3 directly, aws sdk in go was taking more memory than the usual. So, most of the time we were observing heap memory issues. We found the memory issue here where it is mentioned that the writeBuffer is taking 8x more memory while reading from s3 instead of reading from local file due to both un-marshalling and marshalling of objects take a lot of memory.

Akka Actors with Async Http

We tried to implement our distributed uploads using akka actors in java with async http client for all the api calls. The important points we observed are:

  1. Akka actors are also light weight like goroutines. You can have several million akka actors in your program for some GB’s of heap memory.
  2. The performance improvement was beyond the expectations. It took only 35 seconds to complete 54 million ids with 30 actors and maximum 10,000 http connections at a single instance on a 16 core machine. After more through benchmarking, the average rate of uploading the data comes around 1.3~1.4 million ids per second for api based channels . This involves streaming the file directly from S3, inline transformation and uploading to the server through api calls.
  3. The streaming supports from reading from S3/GCP were already present in different SDKs and we didn’t see any major memory changes in comparison to reading directly from a local file.

After the above benchmarking, the decision was pretty clear and we went ahead with Akka actor framework in Java with Async Http Client.

The Implementation Phase

After the experimentation phase, we had the framework that we wanted to use and the challenges in mind which we were facing in the current apache spark with python setup. The performance of the system was one of the most important aspects while building the new system. We were already pushing around 100 billions ids within a week to different channels and the number was increasing with enormous speed. Also, we were using around 14 different 16/40 cores machines in our setup which was leading to a lot of infrastructure cost.

The idea was simple: as the number of ids grows, we shouldn’t increase the infrastructure to similar extent. SLA’s should be very well respected and upload jobs should be completed well within the communicated time to clients.

We started dividing the upload jobs into different phases. Like a simple upload job request can consists

  1. Transforming data from raw to channel format.
  2. The files might need to be merged together.
  3. Splitting of files in order to handle the maximum limit given by channels.
  4. Files might need to be renamed.
  5. Different Compression types need to be needed.
  6. Other channel related tasks like hashing, encryption, adding headers etc.
  7. Uploading the file to the channel.

We started creating different actors for each of the task and come up with following architecture:

While once the data is received to data distributor, the flow would look like this:

We have different kinds of steps and each step is divided into a separate set of actors. The number of actors of any step can be configured on the type of it, like transform can have 200–300 parallel actors while merging requires only only 4–5 and so on.

The actors can easily be configured using juice binding like this:

Transform actor configuration with 200 parallel actors with one for one strategy

In order to receive message and perform the function of your choice, we did the following implementation:

Actor receiving message and performing action

After configuring different actors and the implementation of tasks which are assigned to each of the actors. As actors interact only through asynchronous messages and never through direct method calls, we can test the functionality by simply calling like:

ApplicationActorIntances.transformActor.tell(new UploadBasedTask(), ActorRef.noSender());

More details about the configuration and setup can be found here.

The Relaxation Phase

After developing the system around actors and the asynchronous way of communicating between different steps, it was time to benchmark and compare with the existing setup in production if it actually worked or not.

Here are some of the production stats:

Scenario 1: We tested for the channel which we tried in the experimentation phase:

40cores/64 gb machine. Java: 30 actors and 3000 max http connections. Python : Spark for transformation with 10 threads

Scenario 2: We tested for the channel where throttling was required. At max 1000 http connections can be made at a particular instance.

40 cores 64gb processor
40cores/64 gb machine. Java:10 actors and 1000 max http async connections. Python : Spark for transformation with 10 threads

The first use case of 293 million ids was tested in 16-core machine and compared with 40 cores in python.

The use case of 420 million entries, where the in-line transformation was more complex than the others, hence more time in both of the setups.

In both of the above scenarios, we can easily see the performance improvement in case of akka actors. The process, which used to take hours, is now being completed within a few minutes. The rough estimation of improvement we observed comes around 30–40x. However, above examples showcase improvement even more than 50x.

After running the code for more than 6 months and handling billions of ids smoothly and efficiently, Akka actors implementation never fails to amaze us. Recently, we suspended one of our machines as the jobs were completing within a very short span of time and we were not using the resources to the full extent.

From 14 machines(16/40 cores) in python/spark setup, we moved to 6(40 cores) machine akka actor setup. We saved around 35 percent in infrastructure cost and successfully uploaded more ids than we ever did in python. Failures reduced to less than 5 percent and we now have a bunch of happy engineers.

--

--