How data nearly broke us, and how we tamed it back!

Milind Utsav
YFret
Published in
7 min readSep 29, 2017

YFret is, simply put, an ecommerce marketing assistant. It assists marketing professionals to reach out to their users in the most efficient manner, with the right content. The assistant demands a lot of data crunching — recommendation engines, user categorization, performance tracking, and so on. Obviously, there’s a lot of data involved, from various sources. We can, now combine all the sources of user data — direct entries, user actions on the website or a mobile app, third party user lists, marketing campaign responses — to create near real time segmentation and one-to-one personalization based on their profile. This post details our (long!) journey of scaling up our user segmentation and processing complex queries on millions of rows in under ten seconds.

There is one absolute thing about data that we know, now — data grows. Fast. Our initial stack was on MongoDB; it suited our purpose very well. It was flexible enough to serve our customers from different sectors and verticals. We grew our tables somewhat breadthwise, with support for dynamic columns. For quite some time, our most complex queries(mostly batch processing) fetched results within ten seconds. The most common query source, and also the bottleneck later, was our ingenious user filter which can combine a multitude of user attributes, their actions and the objects of their actions, giving our customers a powerful and comprehensive tool to segment their users most effectively. If you observe carefully, the relationship between the user and her actions would require a JOIN between users and actions table — but alas! NoSQL doesn’t have a JOIN clause like SQL systems(MongoDB now has a $lookup operator now which can be used to satisfy some of the JOIN requirements). What we did to overcome this can only be called a workaround in hindsight now, but at the time it was the most logical solution that we could come up with. We embedded the actions inside the users table, every user record containing a list of all the actions she would have done.

A very crude version of our Segmentation one-dot-zero

We started indexing keys in the user record and inside the list of actions too later, as our queries started crawling. We applied a few more optimizations over the basic structure shown above :

  • We stored only the most important actions of a user, which actually mattered to our customers.
  • Only signed-in actions were stored on the premise that it is impossible to target the guest users on any channel.
  • MongoDB replication, so that the read queries can be distributed across different nodes. The queries also had more compute to work on.

This was around the time when we signed up a couple of big customers on our platform; we were not ready for the onslaught coming our way. The large amount of data from these specific customers almost always were in memory owing to the frequent querying and a relatively large number of campaigns being run.

The effect was disastrous. As is usual in such cases, it triggered a chain reaction.

The customers’ queries were slow — they almost never completed due to server timeouts, increasing which resulted in the servers themselves becoming unresponsive for several minutes, since there were no available threads to service the requests.

Our entire system was bobbing up and down in this muck. Within days, we had to create dedicated MongoDB replicated clusters for these large customers. We set up archiving mechanisms on these clusters on the assumption that queries on data older than a few months were not immediately useful.

The last bit of juice we could have extracted out of Mongo was through sharding. We studied sharding extensively and tried to fit our data in sharded clusters to run the queries in the true distributed sense. For sharding to be beneficial, the selection of sharding key is very, very crucial. After looking at our data, and the kind of almost random queries that we were going to run from our user filter, we decided against sharding. None of our possible shard keys matched all of the crucial criteria. A good shard key should at least be :

  • randomly distributed
  • have good cardinality
  • present in most of the queries

For every 10X growth in the data, an architecture redesign from ground up becomes imminent. And necessary.

That milestone was long overdue. Our system was not built to handle huge traffic from our large customers — the dedicated clusters were raising our cloud costs, and even those were starting to become inefficient. MongoDB has an interesting catch — the database level lock. If you are writing to a table(collection, in MongoDB lingo), all the reads to all the tables in the respective database are blocked until the write operation completes. And, write operation naturally takes time if you have many indexes on the table. Starting v3.0, MongoDB supports a collection level lock; at the time we were using v2.6. We knew we had to part ways with MongoDB soon. For our user segmentation, at least. We wanted to store more data, and we desperately wanted JOINs.

We started looking for SQL query engines on Hadoop, and came across Apache Drill. On the outset, it seemed a fantastic choice for our user filter — it fit our query structure and requirements like a glove. Drill gave us a lot of flexibility in terms of data sources. We could query Mongo tables, Hadoop files, even JSON files, individually or together, through a single query interface. We segregated the data in such a way that the fixed, static part of it, basically what the user did went to Hadoop as parquet files, and the dynamic part of the data, the user information etc. was stored in MongoDB, because updates! Most of the other major query engines on Hadoop do not support updates on a row already inserted.

A basic query on Apache Drill. Returns all users from Bengaluru who bought products worth more than $1000. Segmentation two-dot-zero.

Note how easy it is to JOIN Mongo collections and DFS/Parquet files, and query them, without any transformation. Drill has lots of other data connectors.

Apache Drill though, came with its own set of problems, the query performance lowered a bit mainly because of JOINs between different data sources, which we were okay to compromise on slightly, because of the flexibility and scaling room we got. The community support was poor at the time; it took a long time to find fixes and solutions to problems, since it was still under development. After prototyping and porting a section of our user filter to Drill, we also faced stability issues with the software itself. The MongoDB storage connector used to disconnect randomly after days of usage. The queries threw exceptions whenever Drill encountered a strange column — it expected explicit casts everywhere which affected our code quality and consistency.

All things considered, Apache Drill is evolving into a very promising engine and these kind of bugs are continuously being squashed.

Since we were on Google Cloud that time, we decided to try out BigQuery, a PaaS solution offered by Google for large data workloads. The query times were off the charts. For 300 million rows of Wikipedia articles’ data, the queries returned in under 10 seconds, even the complex ones. BigQuery is built on Google’s Dremel Engine, one that it uses for its service, even parts of Search. We were rest assured of Google’s excellent infrastructure and compute capabilities. BigQuery doesn’t support updates to a row, and it was time to finally tackle that problem — we didn’t have any other way.

Luckily, the basic timestamped row method worked for us. Whenever we need to update a user record, we instead do an insert with the timestamp. For every user, we maintain a bunch of rows with the timestamp of the insertion.

The timstamped rows of a user — with times when the respective record was updated.

We perform a self-join with the users table on the timestamp, and we get the record with the largest timestamp(SQL MAX function).

The self join technique for getting the last updated row of every user

This is done for every query that involves user attributes, ensuring that in every query we always get the latest record of every user. The actions of the users were stored in a separate table and we were in the world of JOINs now. We had no problems with the actions table being append-only. The queries became more complex this way, so we had to put considerable effort in a JS query generator which transforms the information in our user filter UI to its respective SQL query.

There were sweeping changes to our entire system due to this migration, which we have made over time. The data types have to be sacrosanct now, since we have shifted to SQL-like constructs from a weak-typed database. For querying raw data directly, we now have to process the clickstream events before they could be inserted into BigQuery tables. Since we are an omni channel platform, we had to devise ways to get the appropriate fields(email address, phone number, etc.) cascading through a lot of nested queries, their projections, GROUP BY clauses and JOIN conditions. We have plugged all the points of user entry in our system and connected it to BigQuery — direct user upload, events, direct registrations, etc. Our product catalog modules export data to BigQuery tables on regular intervals.

Now, we have a user segmentation system which can query millions of rows for our customers in under ten seconds in most of the cases. There is a small overhead of about five seconds in BigQuery irrespective of the query complexity or data size. After that, it is blazing fast. BigQuery also has its own SQL-like clauses and functions which extend the functionality of the usual ANSI SQL constructs greatly.

Since BigQuery is a PaaS based system, we don’t really have to worry about its resource allocation, performance tuning and management — Google takes care of all those internally. Now that all of this effort has finally culminated into a robust and scalable product, we can bear the brunt of huge incoming data from large e-commerce players. Our next steps on this platform are possibly going to be around query optimisation(reducing those large number of JOINs, for one!), and more importantly, porting our dynamic product segmentation also, to BigQuery. More tinkering on the way!

--

--