Cooling down hot data: From Kafka to Athena
By Nicolas Goll-Perrier (Lead Data Engineer)
For the last 15 years, leboncoin has been providing classified ads service in France, growing up to its current size hosting more than 40 Million ads in 70+ categories — ranging from real-estate to video games — and serving 30 Million active users every month.
This lead to an increasingly large amount of data generated on the platform, both in volume and complexity. Around 2015, we realized we needed to improve our technical and organisational capabilities to leverage data in order to provide better services to our customers.
This led us, among other things, to shift our data consolidation pattern from a raw logs and database dumps to a more stream-based approach, and thus our history with Apache Kafka — a distributed event streaming/pub-sub platform — began.
But Kafka is an “online” system, better used for inter-service communication and storing logs of events as they occur, albeit not indefinitely. In order to be able to analyze, transform, and consolidate all those events in an “offline” data-storage — such as a datalake — more suitable for large batch jobs and machine-learning training tasks, an event archiving workflow was required.
This article is the story of how we reworked this workflow, exposing large quantities of heterogeneous data from our “online” Kafka Clusters to our datalake for long term data retention, analytics, legal requirements, and as a machine learning “fertilizer”. We’ll explore how we went from a manual and tedious process to a fully automated workflow, including the creation and evolution of tables on our datalake on AWS S3/Athena/Spark, with human controls & checks required only where they mattered most.
A bit of history
The first actual use-cases for Kafka were the audience tracking services, which were already reaching close to 100 million daily records at the time. A JSON-based serialization would prove costly both in terms of storage and network bandwidth.
Thus, we were lucky enough to start with a few best practices for event serialization, using Confluent integration of the Avro format through their Schema Registry. We even came up with some norms for our future topics, as well as some base-line configuration options for the future expected topic typologies (based on throughput, sensitivity and consistency requirements).
We then implemented our own custom solution for topic archiving on S3, enabling cold storage and access to topics as datasets with a longer retention, as well as the ability to use these for analytics and machine-learning training (using Spark at first, then AWS Athena). This solution took care of converting events from Avro serialization (best suited for atomic events), to Parquet format, more suited for large datasets, read-intensive operations.
This was weaved in our Airflow scheduler as a hourly set of DAGs, and for a couple years, matched our expectations, and usage of Kafka grew across the company.
Consolidating the schema catalog
As usage grew, new challenges started to appear, both technical and organisational. More contributors meant more ways to work and define event data schema, and although the Avro specification is pretty complete, it’s insufficient alone to guarantee consistency across the whole company when it comes to:
- Mandatory fields (identifiers, event ids, timestamps…)
- Date format: String or Numeric? RFC-3339 or other? Timestamps in seconds or milliseconds?
- Backward compatibility checks
- Field name conventions
Along with many other “details” that are very important to streamline working with the datasets. This particular set of issues was solved through:
- Moving to a dedicated platform team for Kafka’s infrastructure
- Unified topics & schema catalog repository on git
- Automated ACL management
- Rigorous schema continuous integration and deployment
- Backward-compatibility checks and better tooling
- Training/documentation through a set of RFC & conventions
It also laid a clear foundation on which we built our referential for all schema across the company, but not only, since both our subsidiaries and our parent company (Adevinta) are also able to contribute/publish events. The layout of the repository is therefore structured around three static levels for each topic:
- Company (leboncoin, Adevinta, L’Argus…)
- Domain (ads, payments, messaging…)
- Environment (schema are consistent across staging/production, but are not deployed at the same pace)
Which gives us this layout:
Each environment then contains a “simplified” topic configuration, which will be translated into a more detailed Kafka topic configuration by our continuous delivery tools:
Soon after those actions, we started seeing a clear decrease in the number of malformed events being published, and a reduction in discrepancies across schema.
Human processing unit scaling
But this only took care of part of our scaling problem, mostly for producer teams (mainly micro-services owners). We still had to configure every single topic we wanted to archive on AWS S3, defining in a specific configuration:
- The name of the topic to archive
- The path where dataset should be stored
- The partitioning scheme of the output
- The event date/time column to use
- The format to expect the date/time to be
- The name of the table to create in our Hive metastore (AWS Glue)
- The sensitivity of the dataset created and its lifecycle
We even had to write (and maintain) a custom Avro to Parquet conversion tool using Spark. Moreover, though Avro serialization was the rule, there were exceptions for certain topics, which we hadn’t planned for, and had to deal with using other approaches.
Finally, each table schema had to be translated from Avro to Hive/Presto to be usable in Athena or Spark properly, by manually declaring the appropriate DDL in our Glue metastores in all environments.
All in all, what was doable in a small data-engineering team with a few topics and a few producers, was quickly becoming unmanageable by 2018 when we started shifting to a Feature Teams organisation, with an even more distributed pattern of producers and stakeholders…
As the number of eligible topics grew close to 100 (now more closely to 160), we had to rethink our workflow to fully automate this tedious and error-prone archiving process.
The plan was thus to fully automate this process, first from Kafka to S3 (while retaining the Avro to Parquet conversion we already had implemented in our custom process), then from S3 to Athena.
The expected improvements were multiple:
- Gains in productivity for the Data-Engineering team as we would no longer have to worry about propagating the configuration anymore
- Gains in data availability for new topics, as any newly published would be available as an Athena table less than 30 minutes after the first event was published
- Faster detection and isolation of failures due to switching to a non-batch ingress process (though the writing on S3 remains per batch, the archiving process itself would be made by live consumers)
- Clearer observability of individual state, event rate, error count, lag, and memory consumption of each connector in real-time
- Add additional Kafka-specific information in the data structure for debugging and to detect abnormal behaviors (late events, skewed partitions…)
- Improved alignment between the product & data teams processes and tools
This was also an opportunity to fix some of our past mistakes in our datalake layout:
- The ideal scenario was that bucket prefix and table name in Athena would be directly infer-able from the topic name and deterministic, but past configurations were inconsistent, due to years of manual configuration
- Move to a simple “per day” Hive partitioning scheme on S3 to a “per hour” approach, while also adding the schema as a partition (even though our Avro practices ensures we have compatible schema, splitting them in separate path improves our debug ability and gives us options in case of unforeseen schema-merging complications)
- Normalized prefix structure to enable fine-tuning of dataset storage policy (expiration for GDPR and storage type transition for cost reduction)
The right tool for the job
Fortunately, around this time (2019), the ecosystem around Kafka had evolved quite a bit. Long gone were the days of Kafka 0.8 and the Confluent platform was getting richer by the minute. We wanted to base our next approach on a robust, well maintained, and scale-able tool to be able to flush all those events in orderly and efficient fashion on AWS S3.
Kafka Connect and its AWS S3 Sink Connector, were chosen as the platform for this new setup. Kafka Connect is a platform dedicated to pushing data into a Kafka cluster (through source connectors), and pulling data out (sink connectors) from and to many other types of data systems (RDBMS, ElasticSearch, DynamoDB, S3…). And all the wiring is done mostly in configuration, not in code, through calls to a dedicated REST API.
Internally, it works as a Java-based distributed “manager” of connectors (which are either producers or, in our case, consumers), relying entirely on the existing Kafka cluster for state storage and synchronization across workers. No need to add other infrastructure elements than the workers themselves.
The deployment itself is done on Kubernetes using an extended Docker image from the official Confluent Inc platform, in order to add some minimal custom elements (which we’ll detail later) and some plugins, most notably to include the S3 Sink plugin and the Single Message Transforms (a set of handy atomic transformations which can occur on message at processing time).
We then deployed to our EKS Kubernetes cluster (but deploying to any bare metal or virtual instance is obviously also possible) using the official Helm chart, after tuning it a bit to adjust to our specific flavors of monitoring and conventions (such as the usage of JMX+Datadog for monitoring, as the provided helm chart assumes Promotheus will be used).
Adjusting the workflow
The main goal of the project was to automate the process to improve its stability, and reduce the manpower required to maintain it to nearly zero.
The only way we could do this is by leveraging our existing conventions and CI/CD toolset for our schema repository on Git: every time a change is performed on this repository, the following operations occur (per environment):
- Creation of the topic (with expected configuration for throughput, number of partition, ACL management) using a JSON configuration provided by the producer (generally, a Feature Team in charge of a micro-service)
- Validation of the schema, and registration in the Kafka cluster’s schema registry
- Creation of a new connector on the Kafka Connect platform, with its configuration inferred directly from the information provided in the topic and schema configuration. A set of (override-able) defaults are used everywhere to setup the connector
This last step is the one we added for our archiving purposes, by adding a simple python CLI which auto-configures the connectors on the cluster for each topic declared in the schema repository. This CLI is then called upon changes by our continuous deployment tool, Concourse, and triggered on each merge on the master branch.
The idea of having a single consumer per topic, is to ensure proper separation of each topic as a single workflow, and ensure that no undesired side-effects may be induced by a miss-configured topic (although those are now rare, they are not impossible, and we want to limit the scope of errors in the process to the smallest unit). This deliberate choice has its caveats as we’ll see later on.
AWS S3 Sink Connector configuration overview
Here’s an example of the resulting configuration for such a connector (partial for brevity’s sake, but most of what’s important is here):
There’s quite a few things to unravel, so let’s review each of the main sections step by step.
Base configuration and error handling
The connector.class is to specify which connector (which comes as a plugable class in Kafka Connect) to use. Obviously, here we use a Sink Connector to S3, by specifying the class, which Kafka Connect will use to instantiate the consumers.
We also specify how to deal with errors (connectivity, de-serialization, data extraction, transforms…). Available behaviors are to either block, ignore, or push the raw event to a Dead Letter Queue and keep going. Here we specify that we want to block on any error (no tolerance). Also, since this is a staging configuration, we log the full events as they don’t contain any data from our customers and may help greatly in debugging.
As for any other kinds of error handling (connectivity issues with Kafka or S3, crashes due to system errors, or pods rebooting), the good news is that the whole process being a standard consumer, the offset will only be committed after a push to S3 has succeeded. There are even some guarantees for exactly once consistency to avoid any duplicates on S3, but these require some specific configurations:
Even with these guarantees, having some kind of unique id in your events should always be considered, to still be able to perform de-duplication on those events downstream. The archiving process is not the only place where duplication can occur.
Format and serialization
Then, there are the “what” and “where to” fields, which define the topic(s) to consume, the bucket, prefix, and storage class (for other implementations, such as GCP).
We also define the key.converter or value.converter classes to use for de-serialization of the input (here it is not specified as Avro is the default) and the format.class serialization for the output (Parquet here, but you can also use JSON, Avro, or “raw” to avoid any conversion).
Most importantly: the partition path.format, which will be appended on S3 to the Parquet files for each partition. In order to handle breaking changes more smoothly, we use the Avro schema version as a first prefix, and then a standard date/hour partitioning scheme (more on that topic below).
Hive-compatible Partitioning scheme
Then there is the most interesting part: the partitioner and the timestamp extractor, working hand in hand to find which time should be used to partition the events on S3.
Each streamed event contains two separate, similar but fundamentally different time information:
- The time the event occurred, which means the time something happened in the real world (a user publishes an ad, a payment goes through, a click is performed). Let’s call it “business time”. By convention, we always put it in the schema (with some varying interpretation of our conventions when it comes to format, but it has to be present)
- The time the event was ingested into the streaming solution, meaning the time it was produced on Kafka, which is by default stored in the meta-data of each event (so outside of the schema). Let’s call it “Kafka time”.
There is also the time the archiving process occurs, but this one is non deterministic as several attempts may have to be performed to store the data. Never use that as a reference.
We need to store this data on S3 using Hive-style partitions, to allow partition pruning for the different querying engine that will need to read the data as efficiently and cheaply as possible.
But we can only use ONE of the two scheme to partition the data. And there is no perfect solution:
- Using the Kafka time is the simplest, as it will be provided by each event’s metadata (so it’s always there, in a standard format, and can be retrieved easily), and should be monotonically incremental (per partition). However, it does not represent the real time: events ingressed into Kafka might be delayed, for minutes, hours, or days, due to technical or business reasons. So analytics and downstream algorithms can’t use it as a reliable source of truth. Stored datasets will have to be re-interpreted and reprocessed to match the actual business time before they can be used.
- Using the business time is trickier, as even with conventions, there is no guarantee that the field will be there, will be valid, and will be coherent. Moreover, if the producer is poorly written, it may produce events out of order, even in a single partition, thus complicating the process. But it has the nice advantage of already being storable in its target partitioning schema (something which will have to be done anyway), even for late events (which also need to be handled/monitored specifically anyway).
At leboncoin, we use the second pattern, mainly due to the fact that our data platform’s history is deeply rooted in analytics.
Although Kafka Connect comes with its own set of partitioners, which can be tuned to both behaviors, we had to implement our own which “automatically” tries to locate the business time field among candidates, and use the first it can locate as the base for the time partitioning. It also takes care of injecting the schema version in the partitioning scheme.
This comes as a custom class, implementing a standard interface which we bundled with our Kafka Cluster. The interface itself is fairly straightforward, so it’s not a huge hurdle to write the adequate few lines of java and bundle them in a jar in our custom docker images:
The implementation should also take care of ensuring that the UTC timezone is used when extracting date/time information, for obvious reasons.
Next, we have the Single Message Transformations, which are a set of configurable, simple transformations which can be handy to alter the output format or add information fields.
Here we add the time the event was written to Kafka (from the event’s metadata, so the Kafka time according to our previous definition), the partition, and the offset to the output, and discard all tombstone (which are handled by a separate process for GDPR compliance).
Flushing to S3 policy
Finally, we define how events are flushed to S3.
Since S3 is NOT a stream-friendly datastore, we can’t send it atomic events, and need to buffer/flush on specific conditions, which are handled per partition:
- When a specific amount of events (flush.size) have been processed: this is to avoid very large files on S3 and to limit memory consumption on the connector (which has an internal in-memory buffer).
- When a specific amount of run-time (rotate.schedule.interval.ms) has elapsed (wall-clock): this ensures regular flushes of events even if no new event arrives, particularly useful in low-throughput environment/topics. Here we want to force creation of a new file on S3 every ~20 mins, for each partition.
- When a specific amount of time has passed in the extracted partition fields (data-clock): since our hive partitions are 1h buckets of events by business-time, we want to create new files if we enter a new bucket.
The Steering Wheel
All this wouldn’t be complete without the adequate monitoring and alerting on all topics (a few of which often dwarf the rest in terms of throughput), to ensure we are able to deal with any anomaly in a timely fashion, thus maintaining the data “fresh” on our datalake for use by our data scientists, analysts, and data services.
Our main metrics are the amount of tasks actually running, the amount of lag for each topic/partition, the load (especially memory) and its skew on each worker, and the amount of events processed per time-frame.
Still a few shortcomings
All in all, this solution works well, but there are a few shortcomings that could be improved in future works:
- Memory consumption is pretty high. Kafka Connect buffers the events in memory, and not on disk. This makes sense for most Sinks, but for S3, where we only flush to S3 every 20 minutes, for some topics, this can mean a lot of events. Moreover, priming a new connector on an already full topic will involve a lot of those buffers to be maintained in parallel, in particular if the flush configuration is not properly set. A better solution would be to cache this buffer in temporary disk space, better suited for this kind of Sinks. A future feature contribution for Kafka Connect, or some investigation on the inner working of the S3 Sink plugin, maybe?
- Too many files. We deliberately made the choice to flush every 20 minutes. This means a new file per partition, every 20 minutes, with some of the biggest topics having > 70 partitions. That’s 210 Parquet files per hour, or 5040 files per day. Some of our topics have a throughput large enough to justify so many, but this is far from ideal for most of them (by default, the Parquet block size is 128 MB). A second stage of table “re compaction” will be required to optimize and reduce cost for readers.
- Load balancing could be perfected. Each connector has a specific amount of maximum “tasks”, which take care of a subset of topic partitions, and balances these in round-robin fashion to workers. However, this is not ideal, even if we take into account the expected throughput of each topic to increase the number of tasks, as large topics, or topics with skewed partitions might be consuming more memory, leading to an uneven consumption of different nodes of the cluster.
- No auto-restart of failed tasks. Although rare, tasks may sometimes fail. Most failure involve a retry mechanism, but once the retry threshold has been reached, the tasks move to a “failed” state. There is currently no explicit mechanism to periodically retry those tasks in failed state, some of which might be able to resume their operations (for instance, we sometimes have mishaps in topics ACLs, leading to task failing. Fixing the ACLs requires an explicit restart of the task afterwards).
- Untimely re-balancing. When a node fails, or is rescheduled by our Kubernetes cluster, the consumers are re-balanced to take care of the missing connectors and tasks, which is expected. However, when more resources become available, the time it takes to re-distribute evenly the missing tasks does not seem very tune-able, and can be longer than expected, for obscure reasons.
- Conversion from JSON to Parquet requires an explicit JSON schema (though this is probably for the better anyway). We chose to keep the archiving process output in JSON for those few topics.
In spite of these shortcomings, which we will have to improve over the next months/years, the solution is extremely stable, and allowed us to unplug our old system for Parquet conversion and topic archiving.
But with this, we were only half way there…
Automating the exposition through Athena
Now that we have a clean flow of topics entering our S3, we want our users to be able to discover them and query them. Wouldn’t it be nice if new archived topics were automatically added to our Glue Metastore (our AWS managed Hive Metastore) as tables, usable by Athena (an AWS managed Presto implementation) and our Spark jobs?
Well, first, we need to solve a few issues as to how we will:
- Infer the DDL of the table from the Parquet files on S3, including the partitioning scheme and storage location
- Create/Update the table definition in metastore upon new table/new schema version
- Register new partitions in the metastore as they arrive on S3
Fortunately, “there’s an app for that”. The AWS Glue service is composed not only of the managed Hive metastore, but also of an ETL tool, and a system of dataset crawlers. This last feature offers us the ability to “scan” a path on S3 (or other data sources), and discover the corresponding datasets (as long as schema compatibility is covered, which our Avro workflow guarantees). Then, it adds and updates the Hive metastore with the schema and the partitioning scheme.
Which is exactly what we want to do, so no coding needed? At first it seemed too good to be true… And, unfortunately, it was…
Crawling is not the fastest way to move forward
There are several issues with this approach:
- It’s slow. Crawlers re-list the whole S3 tree under existing partitions to detect any new partitions/changes. Which can take a while for topics with several hundred thousand Parquet files stored… This also means we need one crawler per topic, to run each discovery in parallel.
- It’s expensive. In our region, crawlers are billed at $0.44 per DPU-Hour, with a minimum billable run-time of 10 minutes. That’s more than $0.05 per run, which we want to trigger every hour, for every topic. No need to do the math, this is over our budget.
- It doesn’t scale. It appears there is a hard limit of 100 on AWS on crawler concurrency. Which means that no more than 100 crawlers can run at the same time. We currently have ~160 topics (per environment) to archive, and it keeps growing…
- It’s opaque. The code for the actual runtime of the Glue Crawlers is not open source, and the documentation is quite lacking about the implementation specifics of how each operation (schema conversion, inference, conflict resolution) is performed.
But maybe all is not lost. The only complex part we need from the crawler is the schema discovery and metastore maintenance (obviously we could code it ourselves with some Spark magic, but the idea was to reduce the maintenance footprint of the whole archiving and discovery process, not increase it).
But this schema maintenance part only needs to run in very specific cases:
- When a new schema is added
- When an existing schema changes
We currently have around 10 daily commits on our schema repository, some of which won’t even require a change to the schema. So it’s doable, with lots of room to spare, if we can reduce to the strict minimum the amount of times we need to run the crawlers.
Now, how do we do that ?
From Lambda with love ❤️
The solution we went for is fully event-driven:
Our Kafka Connect archiver pushes new files to S3. These trigger an event, which is pushed to an SQS queue. This queue serves as a buffer of all new files to be processed.
Then, every 10 minutes, a lambda is triggered, poping those events from the queue, and inferring from the path:
- the company (and thus the database name)
- the topic name (and thus the table name)
- the schema version
- the partition info
This is no more complex than a simple regexp on the path defined in each SQS S3 copy message, like so:
Then, all that remains, is to query:
- The glue crawler referential to see if a crawler exists for the specific table. This crawler will have been created preemptively by the CD process, with a convention containing the topic/table name. If the crawler exists, then we need to maintain the corresponding table.
- The metastore to see if the table and partition set already exist. If the table does not exist, we trigger the crawler immediately (it will take care of the table creation). If the table exists, but not the partition, we add it. If the partition exists, we can safely ignore it.
- If the schema version does not exist in the partitioning scheme, then this means the schema is new: we re-trigger the crawler again, which should pick up the new schema and publish it shortly.
This ensures that we only call a few crawler when necessary, which is at most a few triggers a day, but maintains the partitions in a very efficient and homogeneous way.
For performance and cost reasons, the lambda caches all its results from the AWS API in memory, so every query is only performed once every 10 minutes.
Although perfectly satisfying, there are still some annoyance here and there:
- Up until recently, the Athena engine was unable to merge schema within a table which contain mutating sub-structures. This was a huge pain as these became even more common once we automated the whole process, and could sometime cause breaking changes to the readability of some tables. It was thankfully fixed through the recent release of Athena engine v2, which upgrades the underlying Presto engine to solve this issue, and things now work well even for complex nested schema.
- The glue crawler part is still opaque, and not being able to anticipate its every move in case of non compatible schema (which should not happen in our process, but…) is quite disappointing. We’re not 100% comfortable with depending on some closed-source, potentially quite complex, logic for such a critical task as schema inference, even though it has never failed us so far.
- Partition completeness detection, or “when should I consider the partition to be complete enough for processing” (apart from the inevitable but quite rare late events), is harder than it looks. At first we had a very complex approach checking the state of the offsets of each consumer of the Kafka Connect cluster to ensure that all partitions have passed a certain threshold. However, this proved to be as unreliable as any other process, due to some topics not always publishing enough events on all partition. The clean solution to this is to enforce paced watermarks on all producers, which we currently aren’t doing. So we instead went the lazy, simple way: if H+1 starts appearing, then it means that H is there (modulo a safety threshold). It may look simplistic, but it does the job and nothing of value was ever lost. Also, it makes things much easier to read and debug.
Events go BRRR, humans go ZZZZ
Well, maybe not “ZZZZ”, but at least the team could focus on some other endeavour more challenging than maintaining thousands of lines of JSON, yaml, and DDL.
As a final overview, here’s the complete workflow of the solution, from start to finish:
The only part of human management comes in the definition of the schema and topic configuration, we quickly became confident enough to let this process run its course automatically (no manual blocker to Continuous Delivery, or big red button to push), in all environments, including production.
By removing all but the essential parts of human intervention in this workflow, as expected, we’ve managed to not only recover and hook to the process all topics that evaded our human vigilance (oops), but also to increase performance, reliability and timeliness of datasets for all teams, while providing a more consistent view to the information system to both producers and consumer teams.
In doing this, we are able to again work on higher value added tasks, such as advising teams about schema structure, work on onboarding data in more applications, both through machine learning and pure data services, and generally reprocess this data to make it more directly usable.
Of course, no work is really ever over, and in the future we will still improve this process to enable:
- Inventorying all those datasets in a unique interface to improve data discoverability
- Notifications when a schema is updated with new columns, or a new table is created
- Automatic compaction and optimization of table partition size for better read performance and reduced cost
- Fully automatic GDPR pseudonymisation and anonymisation of datasets
- Normalized access control management
- Reduction of memory footprint for cost reduction and better dynamic scaling
- Replacement of Glue Crawlers with dedicated tools to maintain full ownership of the schema inference logic