How we Achieved a 2x Reduction in Data Processing Costs

Ashish Kumar
affinityanswers-tech
6 min readFeb 2, 2023
Processing Cost Timeline

The above chart clearly shows that in the past few months our data processing cost significantly reduced from something like $20 per hundred million records to around $7 per hundred million records; and that too when the amount of data processed almost doubled. How we did it is the story in rest of the post.

At Affinity Answers one of our core business is enrichment of data i.e adding more to the data by using social affinities model we have built over a period of time.

On a daily basis we have to enrich north of 2 billion records, we get this data from various different data management platforms such as Oracle Data Cloud, Eyeota, Liveramp among others. To deal with this we have built a custom data platform which we internally call it as Unified Data Enrichment Platform or UDEP for short.

UDEP was built on Python, we use cython, numpy and pandas wherever possible to make it fast. One of the goals of UDEP was to make integrating with our data partners easier, before we had to build a tool for each and every data providers we noticed other than some small parts the major part remains the same for most of the data providers.

Architecture of UDEP

Architecture of UDEP at a 10K altitude

Components of UDEP

Mapper: Reads data from the file received from the data provider, maps it to the entities understood by our model using a catalog, returns the data in a generalized format to the next stage.

Score Calculation: The data from the mapper is passed to our recommender system based model and we get a set of recommendations along with our Score for it. For example, an input to our recommender system can be Pepsi which spews out Oreo with a score of say 9 (among many such entities), which denotes the affinity of the fans of Pepsi to Oreo. We call this score True Affinity score (TA score).

Enricher: This component receives the recommendation from the previous stage and decides if we can enrich the record with segment based on certain set of rules we set for the segment like the TA score should be greater than a value x and so on.

Writer: This component receives the enriched records and then writes to the file in the required format the data provider needs.

Manager: Like the name says it manages the entire process from communication with the above components to syncing with other nodes running the similar task (UDEP is distributed), downloading, uploading, splitting and so on.

Features of UDEP

  1. UDEP is run in AWS Batch and uses Spot Instances to run it on a discounted rate.
  2. UDEP is distributed i.e it can use multiple nodes to process as and when the data scales.
  3. Each node in UDEP is synchronized with other node of the same task so that we do not over-enrich, this is done using Redis.
  4. UDEP uses Redis for caching the recommendations so that if it sees a similar record again it doesn’t need to infer from the model.

The only part we felt we could further optimize was the score calculation component, so we did the following

Before we go into the detail of optimization it would be better to understand the current situation

  1. The score calculation component, gets a set of input entities and returns a recommendation list with TA Score.
  2. Internally we use multiple models, the same input would be passed to multiple models and the resulting scores would be averaged between the model scores.
  3. We use Numpy and Pandas for calculating scores and doing an average.

Based on this we could optimize the following

  1. Calculate Scores only for the required entities.
  2. Batch Scoring for n records together instead of one record at a time.
  3. Optimizing the Averaging process of scores.

Calculate Scores only for the required entities

The Score Matrix in numpy for each model

For each model we have n * n matrix, where n is the number of entities we can recommend for in that model, we noticed that we need recommendations only for certain entities at a given time say up to 80% of the above and the recommendations need would always be for the same or lesser number of entities, so we decided to reduce the matrix from n * n to n * m where m is the number of recommendations we need.

The recommendation matrix was reduced to n * m

Luckily this could be done in numpy easily by indexing like so

recommendation_matrix = recommendation_matrix[:, col_index].copy()

By default, indexing in numpy will return a view, while this is fast for one time use it becomes expensive the more repeatedly you query from it, in our case we wanted to query this repeatedly so making a actual array made much more sense hence we use the copy() method. Just doing the above we saved around 20% of our costs and time.

Batch Scoring for n records together instead of one record at a time

Till now we were scoring a record one by one, but after some research we found that scoring batch records would be faster than one record at a time, while batch record would increase the throughput using Single Instruction Multiple Data (SIMD) and some parallelizing techniques it would also increase the memory consumption of the process, this was an tradeoff which was acceptable to us.

Now the technique of making batch inference is different for different algorithms, ours was done using matrix multiplication. Do refer to your algorithm on how to do this. In our case this increased the inference throughput by twice and reduced the costs for the score calculation component similarly.

Optimizing the Averaging process of scores

Like mentioned before we pass the input entities to multiple models and the average of it would be our final TA Score.

Averaging scores from all the models
concatenated_df = pandas.concat(model_ta_scores)
final_ta_score = concatenated_df.groupby(level=0).mean()

This is our code to calculate average. model_ta_scores is a list, were each item was a recommendation dataframe for one model, the recommendation dataframe has columns containing score for the entity and the row corresponds to the batch record.

In this case a transponse of the dataframe was enough for the average to be calculated a lot faster.

Attached is a jupyter notebook which shows the same, we were able to improve our averaging speed by 2 times just by taking a transpose!

A jupyter notebook to demonstrate the performance difference on what a transpose can make!

That’s all about this folks, using all the above improvements we were able to bring down our Data Processing costs and time.

Conclusion

It is always a good idea to take another look at a code we developed long back as you might find something which can help you in bringing down the costs and also learning something new, there are lot of hidden optimizations which can be unlocked by your present knowledge.

So keep learning and do let us know if there is anything else you would have done for this or in your own development journey. Exited to hear from you.

Immense gratitude to Santhosh Kumar, who created UDEP and helped me in this phase of optimization by unblocking many knots in my mind as I was solving it.

--

--