Analyzing Mobile Users Activity with ViyaDB

Not long ago, I worked on a very interesting and technologically challenging use case. Let’s say you have a stream of events, where each event represents a mobile application user activity. Such activity can start even before an application gets installed on a device, usually, whenever a user is “engaged” with a mobile application by viewing or clicking on an advertisement. A significant event, of course, is the install, which acts as a “starting point” for all the subsequent events that might be generated throughout application usage.

Now, if we group our users by installation times and by the advertisement sources they came from, we can spot some interesting patterns. For example, some users tend to stay with an application (or produce more revenue through it) if they come from a specific source. A particular time of day/week might generate more installs etc. Marketers care about these patterns as they help them identify where to invest their money next time to get the best ROI.

How do all this translate from the marketing talk to a technical challenge? Here are several chalanges that make data engineer’s life less easy:

  • Data volume can get to be very big: billions of daily events.
  • Events analysis must happen in real-time — Firstly for ingestion of events and updating the aggregations, and secondly for the query response time that needs to be suitable for human consumption — i.e. <1 second, preferably milliseconds).
  • Events arrive in an arbitrary order. Remember: we use installation time in the aggregation key, which means that old records’ metrics must be updated every time.
  • Marketers issue analytical queries on relatively wide time frames.

When it comes to choosing a database capable of handling this kind of workload, there are several conclusions that we can make:

  • Only in-memory database can handle random writes accompanied with analytical queries, which require full table scans.
  • Data can be partitioned by at least an application ID, since 99% of marketer’s questions are about a single mobile application.

There’s a single writer (usually, events are ingested from multiple sources, then written to some kind of streaming system that optionally pre-aggregates them into small batches from where they can be written into a relevant database instance).

The Solution

We’ve tried several “out of the box” solutions when I originally worked on this problem, but none of them was really great — either because it did more than required (e.g. ACID compliance through write ahead log, which slowed down the ingestion process significantly), or it provided pure analytical performance because it was designed for OLTP workloads from the beginning. From another perspective, re-inventing a wheel can be justified sometimes, especially when custom wheel solves a specific problem better than all the wheels in the world:

The first implementation was very naive: a vector of records stored in RAM contiguously, which enabled both fast updates and full scans. Even though the filtering operations consisted of a dynamic code full of branches, it was machine-sympathetic enough to give a performance boost by 50% over the most modern “out of the box” solutions.

The next version was written from scratch as part of the open source ecosystem that I’m trying to establish. It has better data modeling (column based vs. row/record based), run-time code generation mechanisms allowing avoiding CPU branch mispredictions and improve cache locality, more supported data types, etc.

Let’s play with it a bit!

Querying

I didn’t have any real data to test the database against, but I wrote a simple simulator that generates mobile application user events. Let’s generate 100 millions of events:

docker run --log-driver=none --rm -ti -e EVENTS_NUMBER=10000000 \ -e OUTPUT_FORMAT=tsv viyadb/events-generator:latest > data.tsv

Now, let’s start a ViyaDB instance, and load all the events:

docker run -p 5000 : 5000 --rm -ti \ -v $(pwd):/tmp/viyadb viyadb/viyadb:latest

To create a table, save table definition in table.json file:

{
"name": "activity",
"dimensions": [
{"name": "app_id"},
{"name": "user_id"},
{
"name": "event_time", "type": "time",
"format": "millis", "granularity": "day"
},
{"name": "country"},
{"name": "city"},
{"name": "device_type"},
{"name": "device_vendor"},
{"name": "ad_network"},
{"name": "campaign"},
{"name": "site_id"},
{"name": "event_type"},
{"name": "event_name"},
{"name": "organic", "cardinality": 2},
{"name": "days_from_install", "type": "ushort"}
],
"metrics": [
{"name": "revenue" , "type": "double_sum"},
{"name": "users", "type": "bitset", "field": "user_id"},
{"name": "count" , "type": "count"}
]
}

To learn more about ViyaDB data modeling please refer to the documentation.

Then send a CREATE TABLE command to the database via HTTP endpoint:

curl -d @table.json http://localhost:5000/tables

To load the data, create a load descriptor, and save it in load.json file:

{
"table": "activity",
"format": "tsv",
"type": "file",
"columns": [
"app_id", "user_id", "event_time", "country", "city",
"device_type", "device_vendor", "ad_network", "campaign",
"site_id", "event_type", "event_name", "organic",
"days_from_install", "revenue"
],
"file": "/tmp/viyadb/data.tsv"
}

Then send a load command to the database:

curl -d @load.json http://localhost:5000/load

After loading all of the 100 millions of events we will find that this single database instance process occupies about 11 GB of RAM, which is not very much for a single partition.

While the data is loading, we can start querying it already. For this purpose, we can use Zeppelin with ViyaDB interpreter installed. Note that first queries take more time because of the compilation step, but all subsequent queries of the same type run significantly faster.

Finding top 10 applications by install count

Query times: first time — 1.6 secs, second time — 74 ms.

Application event types statistics

Query times: first time — 1.7 secs, second time 40 ms.

Simple retention query

The following query shows number of unique sessions since install date grouped by ad network:

Query times: first time — 1.9 secs, second time — 60 ms.

As we can see, the fact that we’re loading new data at about 300K records / sec have very little effect on query performance time.

What about query concurrency? The following chart shows how single database instance attached to one CPU core scales when using different read concurrency level (from 1 to 6):

Clustering

Creating a clustered solution that makes everyone happy is not trivial, and frankly no generic clustering solution works great for every business. Instead of finding a way to please everyone a business-specific clustering infrastructure can be developed in a more straightforward manner. For this to work we have to have several constraints (more correctly, degrees of freedom) which open up options :

  • Keeping all the data in RAM is quite expensive, therefore we would only want to store a hot dataset there while keeping the cold one in a slower disk-based database. Rolling over unordered data from one dataset to another is not intuitive, therefore the accepted approach is periodically switching to a new pre-bootstrapped cluster once it’s up to date with latest events, and dropping the old one once all the clients have switched away from it.
  • All the queries have a specific partition identifier in them (in our case it’s application ID), therefore all the routing logic can be part of the client: what machine to send a request to, which replica to choose, etc.
  • Cross-partition queries do not exist or pretty rare.
  • Persistence on partition level is not needed, since all the data is always available in some kind of deep storage (S3, HDFS, etc.). An ability to recover fast in conjunction with sufficient redundancy (replication factor) covers the case of one partition losing the data for some reason.

Keeping these points in mind, one shared-nothing approach that can be developed is:

  • Having database process per partition replica running on a single CPU.
  • Having some kind of streaming pipeline that pre-aggregates the data, and partitions it based on equality by partition size.
  • Having a management layer that coordinates data ingestion into a cluster.
  • Having batch processing layer that rolls over data from the hot to the cold place.

A sample diagram of such approach can be found here. It would be interesting for me to see whether such design be adopted by any company, and what can be improved.

Next Steps

I’m planning to continue evolve the open source ecosystem around ViyaDB. There are three directions for development that will help make it more useful:

  • Clustering solution (I’m looking for an integration partner that will help me find a proper solution).
  • Adding more database features like: HAVING clause, SQL syntax, etc.
  • Tooling and integrations: Zeppelin, integration with Apache Spark, etc.

I hope this project becomes useful by filling the niche no other solution I foud, really solves. And, as they say in such cases, please come and help the project by providing your opinion or by contributing!


Originally published at https://www.linkedin.com on October 19, 2017.