DATA INFRASTRUCTURE @ UDEMY

Designing the New Event Tracking System at Udemy

How Udemy built a new event ingestion infrastructure to meet the growing needs of the organization for data analytics

Baris Ozkuslar
Udemy Tech Blog

--

By: Ahmet Aksoy, Baris Ozkuslar, Ercument Yildirim, Kerem Kahramanogullari, Mert Tunc, Seref Acet, Umut Gulkok

As a leading global destination for learning and teaching online, Udemy has billions of data points that are generated by over 44M students worldwide. Being a data-driven organization, Udemy leverages the event data generated by user actions, such as clicks, page views, and impressions. This data is crucial for serving a better experience for our users. Some use cases of event data are:

  • Course recommendations
  • Search result ranking
  • A/B testing
  • Marketing and payment channels attribution
  • Usage statistics

In this blog post, we will cover our new analytics event tracking system. We will describe the problems of the legacy system, outline the key strategic factors influencing our approach, and finally, explain the technical design of the new system.

The Problems of the Legacy System

Udemy has already been using a proprietary data collection system for years. The basic data flow in the legacy system is shown in this diagram:

Legacy system architecture

In the legacy system, each log line includes a fixed collection of fields applicable to every event and a small, generic payload containing additional event-specific details. These logs are parsed once a day and loaded to S3 to be queried via Hive.

Although log parsing is a common practice, it comes with the following problems:

  1. Low scalability: The log aggregation server is not scalable.
  2. High latency: Logs are ingested only once a day. This latency can be reduced but real-time is not possible.
  3. Lack of custom event structures: Since the same fields are used across multiple applications, their meaning may vary by context. Creating confusion for downstream data consumers.
  4. Low data quality: There is no real-time event validation mechanism in the old system. We can only be aware of invalid events if the downstream consumers start to fail. Not being able to realize the problem and having incorrect results is also possible.
  5. Lack of documentation: The generic event structure problem also brings the “lack of documentation” problem.

The state of the legacy system was an obstacle to Udemy’s goal of having high-quality data. It also became obvious that solving the problems step-by-step within the legacy system wouldn’t be feasible. An entire overhaul was necessary. With the goal of replacing this system, we built the new ‘Event Tracking System’.

Analysis and Technology Selection

In developing a new event tracking system, we began by working with our internal stakeholders to identify the key requirements. We interviewed stakeholders across the organization and reviewed the architecture of the legacy system. From this process, we arrived at the following key requirements:

  • The system should support all Udemy applications (both web and native mobile) and provide both client-side and server-side tracking capabilities.
  • The system should enforce schemas and prevent collection of invalid data.
  • The system should not cause degradation in website performance.
  • The system should support both real-time and batch processing for collected events.

Before starting to design a new solution, we needed to answer two higher-level strategic questions that would have a profound influence on the approach we ultimately took:

  1. Should we buy or build?
  2. What core technologies should we use?

To answer these questions, we ran an initial pilot to evaluate existing open source solutions as well as paid SaaS alternatives. During the pilot phase, we built and stress-tested multiple proof-of-concept implementations that were designed to help us understand the capabilities and limitations of different approaches.

Buy or build?

In our evaluation of open source tools and SaaS vendors, we found many of the systems to be quite capable. However, one of the primary motivations for revamping our event tracking system was to drive higher data quality, and none of the off-the-shelf systems we evaluated provided everything we were looking for. In particular, we felt that in the context of a large, distributed engineering team, having tools that would support safe schema evolution was important. Additionally, we believed that synchronous event validation would provide application developers with the real-time feedback necessary to ensure high data quality.

Buy vs. build decisions are always a matter of trade-offs, but after much internal discussion within the team and with our executive sponsor, it was clear that the company saw high quality data collection as a strategic investment that was core to the company’s strategy of driving impact through data innovation. To this extent, we were able to proceed confidently with building our own solution, knowing that there would be internal support for realizing our complete vision.

Which Serialization Technology?

The choice of serialization technology affected many important aspects of the system. The simplest solution would be to go with JSON in the whole system. However, that would mean the data would be stored as raw text which would result in inefficient use of space. We checked other alternatives, and two of them stood out: Apache Avro and Google Protobuf.

Avro and Protobuf are both very powerful tools. They both support:

  • Binary encoding according to a schema: Storing the data is more efficient compared to text-based storage.
  • Validation: The existence of schemas automatically brings the ‘data validation’ to the system as a feature. Both technologies reject data that do not conform to the schema.
  • Schema evolution: Data may change according to business requirements. Sometimes a field may be added, or removed. Both technologies support evolving the schema according to a set of rules.

There are also some crucial differences:

  • Avro has a concept of required/optional fields. In Protobuf (v3), every field is optional.
  • Avro is integrated natively with some of the data systems we use such as Hive, Presto.
  • Protobuf natively supports code generation across many languages, while Avro only supports it for Java.
  • Reading Avro data requires the writer’s schema to be present along with the reader’s schema.
  • If your Avro schemas will need to evolve in time, you need a schema registry to reference schemas without storing the whole schema with the data. Details here.

There were plenty of advantages to both formats that made it hard to choose. In the end, we decided to go with Avro because it’s better supported in the data ecosystem and its schema compatibility rules were stricter.

Avro is compatible with JSON. This allowed us to use JSON for frontend clients while using binary Avro serialization for internal events.

A Closer Look at The Components & Services

Informed by the key decisions described above, we then began designing and implementing the new event tracking system. Here, we’ll describe some of the key choices that we made during our design process.

Architecture

High level architecture of the system

Let’s explain what is happening in the above illustration. We have Avro schemas and Confluent Schema Registry at the center of this project. Some connections were omitted in the diagram but actually, all components that do Avro serialization/deserialization access Schema Registry.

The Udemy clients (Web app, iOS, Android) and backend web servers include a component named ‘Tracker’ written specifically in their language. With the help of trackers, these clients send events to a service called ‘Event Collector’. The events are serialized into Avro format (from JSON) according to the schemas fetched from Schema Registry. Then these events are written to raw-event topics in Kafka.

Event Enricher reads the raw events, “enriches” them, and writes back to the final event topics in Kafka. Now, they are ready to be consumed by various stream processors.

To enable querying/batch processing on Hive, Kafka S3 connector reads the events from Kafka and offloads them to AWS S3.

All components are developed according to the Twelve-Factor App methodology and are deployed on Kubernetes. The system is thoroughly monitored by anomaly and threshold monitors in Datadog and fires an alert via PagerDuty in case of any emergency.

Avro Schemas

As mentioned, Avro schemas are at the center of this project. The schemas are not only a way to increase the data quality by rejecting invalid events but it is also a communication medium. It acts as a contract between the event producer and event consumer teams. When a new event is required, the team who will produce it must first create its schema and register it in Schema Registry before producing event data.

Let’s examine the anatomy of a schema and see the custom features we implemented on top of Avro. (Actually, this is the Avro IDL format, not the actual Avro schema format. It makes editing and reviewing easier for developers. It is converted to Avro schema format during registration.)

An example Avro IDL schema

Schema Registry

Avro deserialization requires the exact version of the schema that was used during serialization. While it’s possible to embed Avro schema with each message written to Kafka, this is very inefficient, because the schemas are usually bigger than events.

A better solution is provided by Confluent as an open-source tool called Schema Registry. Schema Registry maps each schema to a unique id so that instead of including the whole schema, just the schema id could be included in the message written to Kafka.

Schema Registry is expected to be accessed by any service writing events to Kafka, hence it is deployed with multiple instances to provide high availability.

Compatibility checks

The other key feature of Schema Registry is providing compatibility checks for schema evolution. We are using the FULL (BACKWARD+FORWARD) compatibility configuration for all event schemas by default. This way, we ensure that no change on existing schemas breaks the existing producers/consumers. This compatibility configuration is configurable per event type, in order to provide event owners more flexibility.

Event Collector

The primary purpose of this service is to provide a simple endpoint to publish events to the event tracking system. Clients send event data in JSON format, then Event Collector serializes them to Avro format and publishes them to Kafka.

This is a scalable and highly-available microservice deployed on the Kubernetes environment of Udemy. It is written in Kotlin with SpringBoot.

Event Collector internals

Features of Event Collector are

  • Serialization: Converts the JSON formatted event data to Avro.
  • Validation: Verifies that the event conforms to the Avro schema during serialization
  • Persisting: Sends the events to Kafka with KafkaProducer.
  • Sync enrichments: Enriches events with data contained in the request eg. receive timestamp.
  • Dead letter queue: Sends invalid events to a separate Kafka topic.
  • Monitoring & logging: Emits metrics to collect general statistics and also produces logs about the invalid events.

Trackers

While it’s possible to send events directly to Event Collector service, there are some low-level concerns one would need to address such as buffering, batching, retrying events and HTTP communication. Instead of leaving event producer teams to find their own solutions for these concerns, we provide tracker libraries that abstract these concerns from the producer teams. Publishing events are simplified to just creating an event object and calling the publishEvent function.

We currently have five different tracker libraries:

  • Python and Kotlin trackers for backend services.
  • JavaScript tracker for the web app.
  • iOS tracker, written in Swift.
  • Android tracker, written in Kotlin.

Kotlin tracker (backend) differs from the others in that it directly writes events to Kafka instead of using the HTTP endpoint of Event Collector. It is specifically designed for backend microservices written in Kotlin. In this method, the application developers use auto-generated Kotlin classes which natively include Avro serialization.

Udemy mobile apps support offline use, so the mobile trackers should also support tracking events while offline. They buffer events to disk, to be sent later when the phone connects to the internet.

For the web app, we have some additional features to reduce event loss rate. We cannot eliminate all possible causes of event loss in users’ devices. For example, their internet connection may drop indefinitely, their device can shut down arbitrarily. But we’re doing everything we can in order to reduce the number of event loss scenarios. It’s important to observe the page lifecycle and handle sending the events correctly. In some cases, such as tab close or page navigation, browsers do not guarantee sending asynchronous requests, and each browser’s behavior is different in how they handle each lifecycle change. We are using Beacon API in combination with asynchronous XHR requests in JavaScript tracker, depending on the current state of the page (signaled by the browser events such as pagehide, visibilitychange).

Kafka

Kafka is at the core of the system, filling the role of the primary data storage system that each component produces to/consumes from. Once an event ends up in Kafka — just after Event Collector receives it and sends it to Kafka successfully — the events are persisted to disk, and from that point on the data is processed with at least once guarantee.* Clients may choose to consume the events directly from Kafka for real-time processing use cases.

We have a Kafka 2.1.1 cluster with 5 brokers that handle 50K messages/s write throughput, and about 100K messages/s read throughput at peak time. Each event type has its own Kafka topic, providing isolation. Each event topic has a replication factor of 3 and partition count of 3. A single Kafka message holds an Avro encoded event, taking up 500 bytes on average. Retention period is configured to 4 days.

* By default, we are using acks=1, not acks=all, because we accept the possibility of event loss in the leader failover scenario to gain increased performance on writes.

Enricher

Sometimes there may be a need to add additional information to events. For example, assume you have the IP address of the user in the event but instead of the IP address, you require the approximate geolocation of the user. An operation that modifies the event, such as adding geolocation information from IP addresses, is an enrichment.

The most straightforward way to add enrichments may be to add them on Event Collector. However, these enrichment operations may be expensive in terms of resource consumption. While designing the system, we decided to keep Event Collector as light as possible. Therefore, we created another service called Event Enricher that is responsible for performing the enrichment operations asynchronously.

Event Enricher also supports removing or obfuscating fields that store sensitive information.

Kafka Connect

Kafka Connect is a scalable and reliable tool to stream your data between Kafka and other data storage. While all components before Kafka Connect were operating on Kafka, Kafka Connect will make the data on Kafka available on Udemy’s data lake on Amazon S3.

Kafka Connect lets you define entities named connectors to carry out the process. These connector definitions contain your source or sink data store information, SerDes, batching-related configurations, partitioning strategy, schema compatibility enforcements, and more.

Kafka Connect does not natively support streaming your data to S3 but Confluent has an S3 connector implementation that can be used to sync Kafka with Amazon S3 using Kafka Connect.

Amazon S3

Amazon S3 provides scalability, data availability, security and performance at a low cost. It is the primary data lake in the entire system. All events are uploaded to Amazon S3 in Avro format through Kafka Connect S3 Connector.

Amazon EMR

Consumers of event data like to work with the existing big data frameworks used in Udemy. For this purpose, Amazon EMR is utilized to manage Hive, Spark and Presto clusters and run them directly on the event data on S3.

Hive Partitioner

Hive Partitioner architecture

Whenever the Avro schema is deployed on production via Event Schema Manager, the corresponding external Hive table is created on Hive metastore automatically.

Downstream consumers (Superset, Hive, Presto and Spark) utilize Hive partitions (year, month, day, hour) in order to query the eventing data from the Amazon S3 efficiently. For this reason, we need to add partitions to Hive external tables if we have events in the corresponding S3 paths.

Furthermore, it is important for the partitions to get created without delays. Due to this requirement, we implemented Hive Partitioner. Hive Partitioner is a microservice that creates partitions on external Hive Tables, in near real-time, through S3 Events Notifications.

In this solution, S3 Event Notifications provides sending notifications to the specified Amazon SQS queue when Kafka Connect completes uploading Avro files to the S3 Bucket. Then, Hive Partitioner consumes these S3 Event notifications, parses them, and adds missing partitions to Hive external tables.

Event Schema Manager

Event Schema Manager operation flow

Our schema repository on GitHub is the main collaboration point for schema preparation and it also conveniently stores our Avro IDL schemas. When a GitHub pull request is ready to create a new schema or modify an existing one, ESM (Event Schema Manager) carries out plenty of management operations for us. It is triggered conveniently via GitHub comments.

Here are the operations that it handles:

  • Uploading the Schema Registry backup to an S3 bucket
  • Converting the Avro IDL schema format to the actual Avro schema format (avsc)
  • Schema compatibility checks using Schema Registry
  • Registering the schemas to Schema Registry
  • Creating Kafka topics for the new events
  • Creating or updating Hive external tables

Monitoring & Alerting

We have a thorough monitoring setup for event tracking. We monitor the system from different perspectives:

  • Each component is independently monitored.
  • E2E performance and correctness is monitored.
  • Correctness and completeness of the produced data is monitored.
Monitors watching Event Collector

Monitoring the components

All components in the event tracking system, including client libraries, services and Kafka, are emitting different kinds of metrics that might help us to identify possible issues with these systems. We have monitors that watch these metrics and alert us for possible problems. For all systems, common metrics include throughput, error rates, CPU/memory usage, data processing rate, and latency.

Event Collector service and Kafka are monitored most closely because their downtime for a period of minutes means data loss — even though the trackers have their buffers, they are limited in size. That’s why Event Collector also has a lot of headroom to absorb the unexpected load.

After the events are written to Kafka, the spiky load is less important because the rest of the components are consumers. They can go at their own pace without causing data loss. However, it is still important to monitor the consumer lag. Since Kafka holds the data for a limited time, a lagging consumer would start losing data if the lag passes the retention period.

Alerts for event producers

Other than making sure that the system runs reliably, monitors also notify the teams in case their events start to have issues.

We monitor the Avro validation errors to catch mis-structured events i.e. events that don’t conform to their Avro schema. And we provide detailed logs for these errors to make them debuggable.

Monitoring data correctness & completeness

We monitor the traffic patterns of the events for anomalies for their completeness. To better locate the anomalies on the traffic patterns, we dimensionalize the metrics by page, client type, and version.

In addition to these real-time monitors, we also support batch data quality checks for ensuring the correctness of the data in a more thorough manner. For these use cases, Udemy uses Soda SQL on top of Airflow to generate quality reports of the data. While defining these checks, we mainly focused on the quality of some common semantic properties and relational properties between critical events for our consumers.

Closing

In this blog post, we explored the history and evolution of the event tracking infrastructure in Udemy. We discussed the important factors that affected our decisions and the design of the system. We also briefly explained how each component works, and how they are connected.

With this new system, we included the producers and consumers to the lifecycle of collected data by providing them with a common contract and an ability to track the health of the collected data. With these improvements we made the data more visible and reliable, while gradually building an eventing culture as many teams adopt the new system. Currently, the new event tracking system has been in production for over a year, providing access to analytics data with sub-second latency. It is being used by over 20 teams to publish over 250 types of events, with a rapid pace of adoption.

--

--