Realtime data processing with Apache Beam and Google Dataflow at Dailymotion
Dailymotion is a leading video platform which allows users to discover and watch new videos based on their tastes and trending subjects, which means our data team must be able to define in realtime which videos are currently being viewed and how, to have a precise idea of what our users actually want to see. Discover how we are able to collect, process and redistribute billions of events across our systems in realtime using Apache Beam framework and Google Dataflow platform.
A significant part of Dailymotion’s traffic is made not only through embedded videos on external websites, but also on our constantly improving website. This means we need to:
- Analyze data to understand the user paths and improve its journey
- Recommend relevant videos to our users: similar ones based on video metadata & new discoveries based on its history
In order to fulfill both of these needs, we collect, process and redistribute billions of events across our systems in realtime — on average more than 50 000 per second — without managing any processing infrastructure using Apache Beam framework & Google Dataflow platform.
What’s Apache Beam?
Apache Beam is a unified programming model to define data pipelines, formerly developed by Google, now open-source and supported by the Apache Foundation. It allows us to develop our batch and streaming data processing jobs with the same codebase and to plug it on our different Data Storage and messaging systems through Google Cloud connectors.
We use Apache Beam in combination with Google Dataflow Platform, which provides us with a fully-managed environment to deploy our pipelines across multiple regions in the world.
Build and Deployment
Let’s start by an overview of our build process for our pipelines developed in Java (which we mainly use for our streaming pipelines).
Beam pipelines can also be developed in Python and many people in the Dailymotion Data Team actually use it (mainly for batch processing).
We extensively use Docker to build and deploy our pipelines as the containers can carry all the required dependencies.
Beam pipelines are built by our Continuous Integration Platform (Jenkins) inside a Docker container to create an “überjar” which contains all Beam and runner (Dataflow) dependencies. The überjar is then pushed to an artifact repository for later use.
A Beam pipeline is deployed through a Docker image with a ready-to-use entry point, provided to drain (i.e. stop ingesting new data, to cancel a job while avoiding data loss) a potentially already-running instance of the job and to start a new one with a bunch of pre-configured parameters.
Streaming jobs are redeployed through CI when pushing to an “environment” branch, i.e.: push to stage branch redeploys a staging pipeline, push to prod branch redeploys a production pipeline.
Basically, deploy to production means deploying the job in a Google Cloud production project, reading production sources and writing in production sinks. Multiple pipelines can be redeployed at the same time:
- The same pipeline in different regions (US, Europe, Asia).
- Multiple parts of the pipeline, which usually share the same codebase and are codependent.
A few minutes after pushing the code, the job can be deployed in Google Cloud Platform and is ready to process data.
Another way to deploy jobs is with Cloud Dataflow Templates system which makes Dataflow jobs runnable from Google Storage with gcloud CLI : `gcloud beta dataflow jobs run <job>`
We still chose to rely on docker containers for the following reasons:
- Pipelines can be launched from any environment with no specific dependencies having to be installed on the host machine. Everything is in the container.
- Pipelines can be launched by any developer for development iterations and for emergency fixes in case of critical issues. A manual way to restart jobs is provided.
- We can embed a bunch of resources, files or scripts, when necessary.
Beam jobs are not only deployed through CI but also, in some cases, scheduled and therefore launched as batch jobs.
We sometimes use our realtime processing jobs in combination with batch jobs, launched through our scheduler: Airflow.
The processing part of the batch job is actually the same Beam / Dataflow job as for realtime. The only thing which differs is the source (input) and the sink (output). Both are parameterized at launch.
Typically, an Airflow batch job consists of 3 steps:
- a DataflowSensor over a data source to check the data freshness
- a Data extraction job (example: export a table from our data warehouse)
- The launch of the Beam job itself. As the überjar has already been built, it’s retrieved from our artifact repository & launched to be deployed in Google Dataflow platform.
Collaboration between data engineers, data analysts and data scientists
How to efficiently release in production?
Full representation of the Build, Realtime jobs and Batch launch process:
Processing and Redistribution of the Data
We extensively use the Google Cloud Platform to collect, store and perform analysis over our data. Naturally, we then use connectors provided by Beam Google Cloud Runner to plug our jobs on every component of the Cloud Platform.
Beam, through the Google Dataflow runner, provides connectors for BigQuery, Pub/Sub, Cloud Storage and more.
As the code is easily extensible, it’s also possible to plug it to any of our systems, like Key-Value Databases or by making REST calls to APIs.
Most of our data processed in realtime passes through Pub/Sub, Google Cloud messaging system. We receive a tremendous amount of events through our own infrastructure and dispatch them to other jobs or in our databases for further transformations. Typical final sinks are Google BigQuery and Key-Valued Datastores to be either processed with batch processing or queried through services.
What does it technically mean to process data in realtime with Beam? Basically, read from an unbounded source (like a messaging system: Pub/sub) and apply a bunch of transforms/enrichments (ex: aggregation, types checks, enrichment of data through API…)
final Pipeline p = Pipeline.create(options);
PCollection<String> rawMessages = p.apply(“Read topic », PubsubIO.readMessages().fromTopic(“projects/yourproject/topics/yourtopic”));
// Apply some transform
Except for some aggregation functions, transformation code should be roughly the same whether the job is launched in streaming or in batch. Only the IO part changes.
Example of an enrichment pipeline:
This is a simplified representation of an enrichment pipeline, based on Dailymotion’s API to enrich video metadata.
- The pipeline continuously reads messages from Pub/Sub
- It then proceeds to enrich the events with video metadata, by grouping events by bundle in a window, to request it firstly to a memcache instance
- If it’s not there, it then tries to call Dailymotion API, and updates the memcache instance. This operation avoids making too many calls to the API and is particularly efficient when there are peaks of views on a restricted number of videos.
Architecture of an events processing pipeline
Here is an architectural representation of our events processing pipeline.
It outputs enriched events in Bigquery and forwards them to Google Pub/Sub messaging system for more specific uses:
- Events from our Player and Apps are first pushed to our internal event bus.
How Dailymotion improved by collecting billions of events
Discover why we’ve decided to collect billions of “events” every day…
- Events are then forwarded to our Data Processing Jobs through Pub/sub and processed
- At the same time, enriched events are then:
- streamed in our Bigquery Data Warehouse
- forwarded to other jobs for more specific processing
In a few seconds after an event actually happened (like a click of a user on a button), it’s available in our Data Warehouse, ready for analysis and integrated into our recommendation systems.
On the way to stability and sanity
As our jobs are deployed on Google Cloud Platform, we can enjoy several features to counterfeit the tremendous amount of data processed every minute:
- The number and type of workers are very simply specified at launch time. Example of 5 workers with 8 cores and 30GB ram:
Dataflow launch parameters
- Dataflow provides an autoscaling feature, which means we can absorb peaks of traffic as more firepower (more machines!) will be added to unburden current workers if needed.
Dataflow launch parameters
- Metrics like pub/sub number of unprocessed messages & lateness are automatically collected and made available through the Stackdriver tool.
- Custom counters can also be configured directly in the job with a simple variable to increment.
It allows us to report in realtime the number and kind of errors raised in a job, monitor & dashboard it and automatically trigger alerts when some thresholds are reached.
By using Apache Beam Framework and Google Dataflow, we managed to easily deploy processing jobs which handle billions of events every hour. Our jobs can also handle big variations of traffic while we’re able to get precise insights of what’s happening. There is still work to do to remain cost-effective as more and more events are processed and our analysis needs keep growing, while having our platform as resilient as possible.