Lessons from Building a Feature Store on Flink

Tianshi Zhu
Airwallex Engineering
11 min readSep 9, 2021

--

Photo by nrd on Unsplash

Airwallex is investing heavily in machine learning (ML) to combat all kinds of risks, for both the business and our customers. To lay a solid foundation for this everlasting battle, we have to build a ML platform first.

This post examines how we are building a feature store at Airwallex, including why we have chosen to build the feature store on Flink, the lessons learned and what we will be working on next.

What is a feature store

A feature store is a central place to store curated features for ML pipelines. It manages the entire lifecycle of each feature; an individually measurable property or characteristic of a phenomenon.

A basic feature store contains metadata about each feature, such as name, version, description, real-time and offline data storage location, etc. A more advanced feature store would support user defined transformation logic, and is able to compute feature values in real-time or backfill features in batch.

The key benefit of using a feature store is feature discovery and reusability. From one model to another, they may share a lot of features in common. Having a feature store with all these features registered can save a lot of time for data scientists and data engineers.

What is Flink

From Flink’s official website:

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

In human language, Flink supports stream and batch processing with internal states. It currently has two major interfaces: DataStream API and Table API. DataStream includes 20+ transformations and is available in Java, Scala and Python. Table API works with regular SQL expressions, and can be converted from/to DataStream.

Flink can be run on Yarn, Kubernetes, or standalone. The cluster can run in session mode or per-job mode. In session mode, all Flink jobs will be run in the same cluster, while per-job mode means each job will be run in a separate cluster.

Flink also provides a REST endpoint to upload jars, start/stop jobs and get job stats. The same REST endpoint is used for the Flink UI, which shows you the cluster resources, running jobs and job details such as transformation lineage, job stats, up time, etc.

Why Flink

Compared to another very popular data processing framework — Apache Spark — Flink offers unified stream and batch processing in both the DataStream and the Table API, and the stream processing can be handled in very low latency as shown below. This is huge for us at Airwallex, since we only need to maintain one infrastructure.

from https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink

Another reason is Flink’s powerful SQL analytic engine and Blink (an Alibaba Cloud’s optimised version of Flink) backend for Table API. It promotes SQL as a first-class citizen in feature engineering, which relieves our data scientists from writing a lot of code.

Flink’s checkpoint and savepoint design for fault tolerance — and exactly-once guarantee within the framework — also makes our life much easier. We can be more focused on building the feature store, instead of how to process data correctly and reliably.

How we build the feature store

At Airwallex, the biggest ML use case is for risk management, be it anti-money laundering, fraud detection, or merchant clustering. A lot of these risk management activities happen at the transaction level, which means we will need to do real-time scoring with very low (double-digit milliseconds) latency and low-or-zero downtime. For training these risk models, we also need a batch system that can process billions of data points quickly.

From the technical side, we want to fully leverage Flink’s unified data processing API and minimise the technologies / infrastructures we need to maintain.

That being said, we still want to be able to correct real-time errors accumulated over the time, and avoid being locked in with Flink. So we made a few interesting design choices.

Kappa architecture

Lambda architecture is a data-processing architecture designed to process stream and batch data in parallel, and serve them in a combined view in real-time. Kappa architecture is a simplified version, which only supports stream processing. By treating batch as a special case in stream processing, Kappa architecture brings a number of advantages:

  • Reduced system complexity
  • Fewer components to maintain
  • Avoiding two implementations for batch and stream processing respectively, which may lead to data inconsistency

To achieve Kappa architecture and fix accumulated data errors, we have designed the system so that all historical events are kept in Kafka topics. Each Flink job will start consuming from the earliest offset in the Kafka topics. By the time it finishes consuming all historical events, the job will have the correct state to process real-time events. When we want to fix some data errors, we just need to launch a new job and terminate the old one.

A thin DSL layer

Before building our own DSL layer, we investigated Apache Beam. Unfortunately, we found it too heavyweight and the Java interface did not support our desired user experience. As a result, we first decided to build a thin DSL layer that is framework agnostic with Flink support.

As of now, the DSL layer only allows a subset of what is supported in Flink, such as map, filter, union, keyBy, sum, window, leftJoin and SQL. However, combinations (of these operators) are extremely powerful to support all our needs at the moment. Adding new operators won’t be too difficult either. The DSL layer allows us to build backends for other frameworks, such as Apache Spark, KafkaStreams, Apache Beam, and so on.

The DSL is following the functional programming paradigm. A word count example will look like the following:

Stream<Counter> counts = source
.flatMap(x -> Lists.newArrayList(x.split(“ “)), String.class)
.filter(x -> !x.isEmpty())
.map(x -> new Counter(x, 1L), Counter.class)
.keyBy(x -> x.key, String.class)
.sum((a, b) -> new Counter(a.key, a.count + b.count));

Users can also define transformation through SQL statements and use other operators before or after the SQL statements:

Stream<CountDataV1> counts = source.sql(
“SELECT depositId, COUNT(*) OVER “
+ “(PARTITION BY accountId ORDER BY row_time__ “
+ “RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) “
+ “FROM deposit_table”,
“deposit_table”,
CountDataV1.class
).filter(x -> x.count > 0);

Two Flink backends

Once the DSL is defined, we have built two Flink backends for it, one for real-time and one for batch. Most of the code is shared between these two backends, with small differences in watermark strategy, which will be explained later.

When implementing the backends, we found that the DSL layer actually helps abstract away some of the complexities in Flink’s API. For example, leftJoin automatically joins data with point-in-time awareness. The SQL method provides strong type information and converts between DataStream and Table API seamlessly.

Point-in-time Joins

“Point-in-time joins” is one of the most important concepts to understand to compute feature values correctly. It means that if we want to compute a feature value that happens at time T1, we should only join data points that have happened before or at time T1. Otherwise, we are using data in the “future”, which won’t be available for real-time model inference.

To guarantee this, we have a notion of time associated with each data point in both Flink backends, in addition to Flink’s own time management. When data points are joined together, we can use point-in-time joins to ensure they do not use data points which only exist at a later point of time. However, this notion is not exposed in the DSL layer, to avoid user abuse and unnecessary complexity for the interface.

Hybrid watermark strategy

When working with Flink, we have found its watermark concept quite hard to understand and use correctly. And the Kappa architecture makes everything even harder.

Before going into details, we need to first understand how Flink infers time. There are two notions of time used in Flink, event time and processing time:

  • Event time comes from an event itself, which could be stored in a field directly or computed from some fields.
  • Processing time is the machine’s system time when Flink receives an event.

In real-time mode, we want each Flink job to first consume historical events with event time and a very large out-of-order window. This can help fix data errors due to events from different systems arriving in Flink out-of-order. We also use event time so that all historical events are not showing up in Flink as if they have just happened in the last few hours or even minutes.

But when we are done processing historical data and start to process real-time data, we want to stop using the out-of-order window, because the window size is the minimum time for which each event would stay inside Flink. There are two more problems associated with this; One is that the last event will stay in the Flink system forever, because there will be no more events to advance the watermark. The other is that real-time events may be dropped, if its event time is earlier than the current max event time minus the out-of-order window size.

We have spent a lot of time and many attempts to solve this issue, before we finally landed on a hybrid watermark strategy. The strategy will first process historical events with event time and a 90-day out-of-order window. Once it detects an event whose event time is within 10 seconds of the current processing time, it will flip a flag, set the out-of-order window size to zero and use processing time.

This approach still has its own problem, such as it may mess up order within that 10 seconds when it flips the flag. But in machine learning use cases, we can tolerate some degree of data inaccuracy, which is far better than missing data, no matter if it’s dropped or being late.

Blue-green deployment

Another thing we found Flink not super effective at is managing job downtime. Whether you want to restart a job with new parameters or restart a task manager or a job manager, it will cause a Flink job to be restarted, which would take at least 60 seconds. We want to minimise the downtime for real-time Flink jobs from day one.

As a result, we have built a blue green deployment system. Each feature’s Flink job in real-time will write to its own Kafka topic with a common prefix in the topic name. These jobs will send each event with the job version in its header to the Kafka topic. There is another downstream Flink job, we call it the “VersioningJob”, which will listen to all these feature topics. The VersioningJob will only output the events in each topic that have the highest version.

The process to restart a job would look like this:

  • Create a new job with the same config of the old job (but version incremented by one)
  • Start the new job and wait for it to catch up, meaning the new job has finished processing all historical events for the old job
  • Stop the old job; at this stage, the VersioningJob has switched consuming the new job’s output instead of the old one

The whole process may sound complicated, but we have built a feature store UI to make this happen in a few clicks.

Job Management UI

Redis-Kafka dual writes

The VersioningJob will write the feature values to both Redis and Kafka, so both synchronous and asynchronous ML model pipelines can be well served. To query feature values, one can either send a HTTP request to the feature store, which will hit Redis, or listen to the corresponding Kafka output topics for features of their interest. To facilitate debugging, we embed event time and processing time in the feature value.

Flink on Kubernetes

We started with Flink running on VMs, but found it difficult to operate. For example, updating cluster configs is a very manual process. Logs are not collected. Scaling or setting up a cluster in one region and one environment takes a few days. What’s worse is that we have multiple regions and multiple environments. It’s a nightmare to manually manage all these clusters.

So eventually moving to Kubernetes is a no brainer for us. Since we are using Google Cloud Platform (GCP), logs in Kubernetes are collected automatically to GCP Logs Explorer. Cluster config is managed through Helm chart. Scaling or setting up a cluster is just one command. Monitoring is also auto wired to the company’s standard monitoring tools for Kubernetes. In addition, Flink on Kubernetes allows us to run the feature store on any other platform, be it AWS, Azure, AliCloud, etc.

Feature management

To be able to reproduce feature computation, we associate each feature with the jar used for its Flink jobs. We also keep track of all Flink job configurations in the feature store as metadata. Each feature definition in the feature store is immutable. It can only be duplicated and modified, which would also increment the version. The new feature will be linked to the old feature to keep its lineage.

What is working well

With all these improvements, we have been running over 100 features on Flink in real-time and/or batch smoothly. The latency for stream processing spans from a couple milliseconds to ~150 milliseconds, which meets our business needs easily. The blue-green deployment has also played an important role whenever we want to upgrade jobs, restart task managers or failover between Flink clusters. Immutable data structures in the feature store help us reason about feature changes more easily. So far, the maintenance cost for the feature store and Flink clusters is quite low. We can focus most of our efforts on enhancing the feature store and other components for the machine learning platform.

Next steps

We are still climbing up the hill of mastering Flink. During this journey, we have made a lot of mistakes and met a lot of difficulties. For instance, we had a problem where task managers went out of metaspace because it kept loading new jars. Or a batch job failed because of not enough off-heap memory. Or a new job can’t be started because the job manager’s blob server stopped responding.

The feature store’s interface isn’t very user friendly either. Regardless of whether a feature is a simple SQL query or some complicated logic in Java, users have to make a pull request in a code repository and go through a quite lengthy feature publishing process, which involves multiple components and services. We want to simplify the process so that most of the features can be created within a UI or SDK environment.

We started building the feature store from scratch six months ago. There is still a whole lot we can do in front of us. In the next 12 months, we would like to

  • Enhance the operation of Flink on Kubernetes
  • Build a Kubernetes sidecar to manage Flink jars across all regions
  • Open source the DSL and the Flink backends as a library
  • Build a Python SDK so users can create features within a JupyterNotebook
  • Integrate with model training and serving platform

--

--

Tianshi Zhu
Airwallex Engineering

Head of Risk Engineering at Airwallex, building infrastructures to protect Airwallex and its clients from financial crime risks. linkedin.com/in/zhutianshi/