From Kafka to BigQuery: A Guide for Streaming Billions of Daily Events
Over the past few months, we have been working on building a new event processing pipeline for MyHeritage. We have collected billions of events a day of many different types, from request logs originating in web servers and backend services, change data capture logs that are generated by our databases and activity events from our users across different platforms.
In this post, I’ll share the journey we’ve made building our data pipeline, while exploring a scalable, reliable and efficient way to deliver data for analysis, providing quick insights for our stakeholders.
We decided to ship our events to Apache Kafka, due to its proven abilities at high volumes of traffic, its durability, and its fault-tolerance. Different event types in our system are maintained in separated Kafka topics, allowing us to control their scale independently. We use Kafka as a message distributer for multiple consumers, streaming the data for their own needs. One of our main goals is streaming data for analysis.
We searched for a data warehouse system that can ingest the data instantly and perform near real-time analysis. We evaluated BigQuery - a scalable, fully managed, no-ops system that is highly available and runs SQL queries. While analyzing data on a massive scale, the BigQuery daily partitioning feature allowed us to avoid costly full scans, and we paid only for the days queried. Partitioning can be done using one of the following concepts:
- Processing time: partition data according to the time a record was observed in the ingesting system, that is, the wall-clock processing time. This approach is simple to implement, but note that delayed processing may lead to storing data in the wrong partition. Such delays can occur as a result of a system failure, a service upgrade, or any unexpected latency.
- Event time: partition an event by a timestamp generated at the source. This can be useful in cases of late data arrivals; Imagine that your user sits on an airplane with no internet connection, performing offline activities in your mobile app. These events will be produced to Kafka only when the plane lands and internet connection resumes.
As one of our main focuses has been making data available for analysis within seconds of an event, we placed the streaming feature as a top priority. Due to BigQuery support for data ingestion in batch and streaming methods into its native tables, we decided to choose it as our cloud data warehouse. Now let’s start our search for the best data loading technique.
Batching data to GCS
The first technique we tried was batch loading to GCS (Google Cloud Storage). We used Secor, a tool built by Pinterest, designed to deliver data from Kafka to object storage systems, such as S3 and GCS. Secor itself is not capable of loading data directly into BigQuery; however, once the data is available on GCS, it can easily be loaded into BigQuery using a load job. The diagram below illustrates the process:
Secor has some nice features. It supports multiple output formats, including Hadoop Sequence files, columnar formats and delimited text files. Secor can parallelize its work by assigning a worker for every Kafka partition. It also allows you to specify how the data should be partitioned in the destination, that is, how to store your data in a logically ordered way.
We have considered three alternatives for partitioning:
- Partition by Kafka offsets. This method fully mimics the source of the data, storing records in GCS by Kafka’s record offsets. Persisting data by offsets disregards the event timestamp, making it difficult to load it to date-partitioned tables in BigQuery. In a pay-as-you-go system, this approach can be very costly, as it may force full scans for most queries.
- Partition by ingestion date (processing time). Data can be loaded to GCS partitioned by Secor’s processing time. Although this gets us one step forward, delayed processing may lead to wrong bucket partition.
- Partition by record timestamp (event time). Within the underlying data of every record, we placed a timestamp field representing the time of the occurrence. Reading this timestamp in Secor and acting upon it delivered all events to their correct GCS bucket, making the final queries as accurate as possible. This was our chosen alternative.
Using Secor presented couple of issues:
First, as Secor lacks support for Avro input format, we had to use Json-based topics. Second, Secor wasn’t designed to load data directly to BigQuery, as it only uploads files to GCS by design. We could have created federated tables and query GCS source directly, but this model presents inferior performance, as compared to querying data stored in BigQuery, which made it an unrealistic option for our use case.
There are several alternatives for loading data from GCS to BigQuery. It can be done manually from BigQuery UI, with job object in Google’s REST API or with its command-line SDK. Another option is to using Google Cloud Functions as a trigger-based NodeJS function, executes upon a file creation in a bucket. We chose to work with the bq load command, a part of Google’s SDK. Using this method, we were able to load data to a specific daily partition in BigQuery, by specifying it along with the ‘$’ decorator:
bq load dataset.table$20170615 gs://bucket/topic/dt=2017–06–15/*
Running load jobs, as well as the other options above, are all done in batches, either performed manually or done automatically after a file has been created in GCS. This is therefore not pure streaming, and does not provide real-time analysis to the end users.
Streaming with BigQuery API
In order to get near real-time results, we examined the option of streaming data directly to BigQuery, with no preliminary step as GCS was. BigQuery API allows streaming data in, up to a quota of 100K rows per project, per second. As opposed to batch loading, where you pay only for storage, real-time data streaming comes with a cost (currently $0.05 per GB).
In our use case, where the data is stored in Kafka, you have to consume the data by writing your own consumer, or use your favorite stream processing framework, ingesting the data to BigQuery as you process your records.
Our stream processing framework of choice was Kafka Streams, an open-sourced library for building scalable streaming applications on top of Apache Kafka. Kafka Streams supports both processing time and event time semantics and presents an auto-replicated application state.
Kafka Streams’ ease of use allows you to execute your code as a regular Java application; Below is a basic code sample, which reads its input from a Kafka topic, filters some records, and sends them to BigQuery:
The missing part is how to deliver the data using BigQuery API. The following example will stream one record to the BigQuery table MyTable:
As you can see, BigQuery provides streaming support for simple types (string, bool, etc) and nested records. This makes data ingestion quite convenient, as your table doesn’t have to be flattened by persisting nested data. Here’s another example, this time streaming a full record, represented as a JSON file:
As seen in the examples, streaming API allows you to specify an insert id. This id will assist BigQuery in determining whether the record was already uploaded, while retrying due to failures.
Using streaming API has its benefits, as it gives you complete control over the records you sink downstream. But with this control comes responsibility to write a robust service. You have to tune your records in batches, as you don’t want to make a request for every record observed. You should design your code to handle possible streaming errors, and may want to parallelize your requests. Of course, the process may create a lot of headaches; Here comes Kafka Connect to the rescue.
Streaming with Kafka Connect
As we discussed, building a stable, fault-tolerant system that streams data to BigQuery is not an easy task. Confluent’s Kafka Connect was designed for the purpose of delivering data in-and-out of Kafka, integrating with file systems, databases, key-value stores and search indexes.
Confluent provides connectors making your life easier by dealing with redundancy, scale and robustness of your system. A “sink” connector continuously consumes data from configured Kafka topics and streams them to an external system of your choice, within seconds. Its flexible scaling allows the execution of multiple workers, distributing the workload throughout several nodes.
As there was no BigQuery connector bundled with Confluent’s installation, we used an open-sourced Connector implemented by WePay. When applying this connector, BigQuery tables can be auto-generated from your Avro schema, providing you the ease of plug-and-play.
The connector also deals with schema updates. BigQuery streaming supports only backward compatibility, so you can easily add new fields with default values, and data streaming will continue flawlessly.
Kafka Connect handles errors out-of-the-box, by committing the consumer offsets to Kafka only after a successful write to BigQuery. Retries will be made in case of failures.
A few important configurations you should know about:
- Decide on your topology first. You have the option to work with one node and configure the number of tasks that are performing simultaneous work (up to the number of Kafka input partitions), or execute on several nodes, building a redundant system by running multiple workers.
- Control the interval of committing consumer offsets using offset.flush.interval.ms (Default is one minute).
- The connector Java heap size is set by default to 256MB. For some pipelines, this can stand, but in big scale you’ll be out of memory quickly. Change environment variable KAFKA_HEAP_OPTS to control it.
Kafka Connect provided a robust solution to our problem. By pushing data to BigQuery quickly and reliably, it enabled us to provide near real-time, accurate analytics.
Streaming with Apache Beam
There are still a few issues left unresolved.
First, while using the existing BigQuery connector in the previous step, we were only able to ingest data by its processing time, which can lead to data arriving to the wrong partition.
Second, we wanted to reprocess old events, providing a system that supports both batch and streaming.
Third, we wanted to split events from a single stream to their relevant BigQuery tables. As an example, a stream of pageview events can be divided to real users and bots, where different queries should be applied.
Apache Beam was developed as a unified model for both batch and streaming, supporting real-time processing and replaying old data. Furthermore, Beam is a portable platform, where you write your code once and are able to deploy it on multiple execution engines, either on-premise or in the cloud. While using its Cloud Dataflow runner, your code is executed as a cloud job, making it fully managed and auto-scaled; Google will change the number of workers according to your current load, and alter the cost of execution accordingly.
Loading data by event time using Beam is described in details in this post, implemented by windowing the data in event-time semantics, and ingesting it to BigQuery by its windowed event time. I’ll take a different approach, by fetching the event timestamp from the metadata of the record. As for dividing a stream to separate tables, Beam’s DynamicDestinations was designed to do just that, giving you full freedom to choose a destination table for each event. Here’s a code sample achieving both these goals:
In this example, you are examining a Beam pipeline handling page-view events. The core of this sample is within the getDestination function, receiving a page-view event and acting upon it:
- Composing the table name by the event type: bot or user pageview
- Deciding on the partition according to an embedded timestamp
To summarize what we have learned, here are the main tradeoffs between batch and streaming load to BigQuery:
As there’s always a tradeoff, every project will embrace a solution fitting its own needs. Choosing your stack is not an easy task, especially when running a real-time data pipeline, where tools are created and evolved every day.
Integrating systems at a massive scale will force you to choose carefully, as you want to build a robust system that can handle the unexpected. This will allow you to bring the data to your users as fast as you can, while preserving the stability of the system.