Powering Streaming Features with Apache Flink at Brex: An Insight into Our Journey

Amy Zheng
Brex Tech Blog
Published in
15 min readJun 25, 2024

Introduction

At Brex, we’ve embarked on a journey to revolutionize our streaming feature platform using Apache Flink. This blog post aims to provide an in-depth overview of our design, implementation, and the lessons we’ve learned along the way.

What is a streaming feature?

Streaming features are derived from real-time, continuous data streams. They are crucial in scenarios where data is constantly being generated and decisions need to be made in real-time.

Characteristics of Streaming Features:

  1. Real-time Processing: Data is processed as it arrives.
  2. Low Latency: Features are computed and used almost instantaneously.
  3. Dynamic Updates: Features are continuously updated with new data.

At Brex, streaming features are widely used in Fraud ML Models, such as detecting transaction frauds and login takeovers. We mainly focus on windowed aggregations on Kafka events, such as past 7 days transaction volume per user, past 1 day login location per user, etc.

Our First Attempt: An In-house Streaming Feature Platform

Architecture

Feature Definition

To reduce the feature implementation effort and boost code reusability, we predefined the state update and calculation of a few commonly used aggregation types, such as avg, max, min, sum, count, breakdown. Data scientists could define the streaming feature in a configuration-driven way without worrying about the aggregation logic or feature state management. Here are the main fields in the config:

Feature ID

Unique identifier of a feature — consists of a name and a version number.

Event Source

The event (Kafka topic) this feature aggregates.

Aggregation Key

The group by key for the aggregation, such as customer ID.

Idempotency Key

The idempotency key of the source events, such as transaction ID — we don’t want to aggregate the same transaction twice.

Aggregation Field

The field in the source event that we want to aggregate on, for example: transaction amount.

Aggregation Type

Type of the wanted aggregation, for example: sum.

Window Length

Aggregation window length

[Optional] Bucket Size

This is an optional optimization especially beneficial for longer window length, since it’ll largely reduce the size of the states. Details are explained right below in the State Data Model & State Store section.

State Data Model & State Store

The data model for the state is simple — we store a list of (timestamp, value) pairs for each aggregation key of each feature, where value is the value of the aggregation field from each event.

We serialize it to a Json string and store it in Postgres, which is the most commonly used and well supported DBMS at Brex. However, Postgres isn’t a good choice to store state. The high state update rate workload makes it difficult to perform database upgrades and resizing. This is one of the key reasons Flink outperforms our native streaming implementation

We trim the states when updating it and when reading it for feature value computation. We take the current timestamp, delete the (timestamp, value) pairs with timestamp < cur_timestamp — window_length.

The state of the bucketed window aggregation is different — we store the aggregation values of buckets, and aggregate the buckets when computing feature values. For example, to get the 30 days transaction volume per user with 1 day bucket size, we’ll maintain a list of 1-day buckets, and store the sum of the transaction amounts inside each bucket. When computing the feature values, we take the sum over the last 30 complete buckets. The state size could be largely reduced as we store the aggregated values instead of raw (timestamp, value) pairs.

State Update

We deployed one consumer group (Stateful Feature Consumer in the diagram) to handle the idempotency check and state update for all streaming features. It listens to a set of Kafka topics and parses all the streaming feature definitions. Upon receiving an event, for each feature that aggregates on this event, the consumer firstly checks whether the (feature ID, idempotency key) is already in the Redis idempotency key store. If it’s already there, the consumer will skip this event for this feature, if not, the consumer will read the previous state, update, and write it back to Postgres.

Note that we didn’t use message offset to deduplicate, which is simple and commonly used. The reason is that some internal services or systems can produce duplicate messages with different offsets but with the same idempotency key.

Feature Aggregation & Feature Serving

Since the feature states are a list of (timestamp, value) pairs, whenever the feature is requested, we retrieve the state from Postgres and calculate the aggregated value. Our Feature Service is responsible for this. Other internal services or models send GRPC requests to our feature service with the feature ID and key, and the feature service returns the aggregation result.

Bootstrap & Backfill jobs

Our streaming infra team has built a workflow to sink all the Kafka events to Snowflake; thus, we have all the historical events stored in Snowflake. These historical events can be used in streaming feature bootstrap and backfill.

What is a bootstrap job?

When deploying a new streaming feature, by default the feature will start with a blank state; therefore, only events that are consumed after the feature deployment will be factored in. Thus, before the feature is deployed for the window length of time, the feature value is from a partial window, which is incorrect or inaccurate. Bootstrapping the state means we compute the feature states with the historical events and upload them to the production state store, so that we don’t need to wait for the window length to get the first valid full window aggregation value.

What is a backfill job?

Backfill also uses historical events, but the output of it is the historical feature value, which is the aggregated result, instead of feature states (which is for bootstrap). This is useful for building a training set for a model or backtesting a policy.

Since we predefined commonly used aggregation types, we also predefined the bootstrap and backfill jobs for each of them in Spark. The bootstrap/backfill job reads historical events from our Snowflake database, and runs the job on Spark clusters, then writes the states to Postgres for bootstrap, or the feature results to Snowflake for backfill.

Bottlenecks/Drawbacks

This in-house streaming feature system served us well for a couple years before we started seeing more and more issues.

Inefficient State Management

We store all the (timestamp, value) pairs for each event within the window length in the states. When states get large, reading and updating the states becomes very slow. Our analysis showed that the main time-consuming part was serialization/deserialization, which could take hundreds of ms per key. Large states also take more time to read/write from Postgres as they’re stored in toast tables if size is larger than 8kb. This resulted in high update consumer lag.

Why did we have large size states? We didn’t put any constraints on the window length. With longer window length, the amount of (timestamp, value) pairs increases. We had some long window features of 1 month or even 1 year! Some other features had small key space, and the number of events for each key was huge, which also caused the large state size per key.

High Feature Retrieval Latency

As explained in the Feature Aggregation & Feature Serving section, the feature retrieval reads the state from Postgres, deserializes it, and computes the aggregated result. With large state size, it’s slow to read and deserialize the states; thus, we’ve seen high latency of feature retrieval — P99 sometimes went above 1s.

Scalability & Isolation

Having 1 single stateful feature consumer group to handle ALL streaming features is simply not scalable enough as the number of streaming features is ever-increasing. Especially with some slow features, the lagging of the consumer would affect all of the streaming features.

Complicated Chaining Features

A special use case we support at Brex is feature chaining or feature dependency — feature A aggregates on the value of feature B. When the dependency relation gets complicated, these features form a graph. We’d need to implement the correct ordering of feature computation, state updates, as well as the bootstrap/backfill jobs, which basically become DAGs.

Covering the complex chaining feature largely increased the complexity of our codebase. However there was actually only 1 model using the chaining features over the past 3 years. The limited use cases didn’t justify the complexity it introduced.

Buggy Kafka Python client

For the first iteration of our streaming pipeline we used a Faust, a Python streaming library. We later discovered that this library had several issues, some of which were fixed by updating the library, but others that were not reproducible and would sometimes halt our production pipeline. We eventually migrated to the Confluence Kafka client and highlighted the need to run a Kafka client that was more robust and performant.

Revolutionizing Our Platform with Apache Flink

There are potentials to improve our in-house system to resolve the bottlenecks; however, it’ll be a relatively big revamp of our fundamental design of the state and feature computation model. While building our own streaming feature system is fun, we’re well aware that there are very mature solutions for this problem that are widely adopted in the industry. We decided to stop investing in the in-house platform and stop reinventing the wheel.

Architecture with Flink

Feature Creation

One goal of the new system is to minimize the learning curve and interface difference to data scientists. Since our in-house system is config driven, we decided to follow the same path when designing the new system with Flink.

The feature configs of Flink streaming features look very similar to the in-house feature configs. We use the Flink Kubernetes Operator to deploy the Flink jobs for the streaming features. Besides the feature definition config, we also need to create a k8s deployment yaml file for each feature.

Idempotency Check

We store the seen idempotency keys in Redis in our in-house system, which unfortunately won’t work in Flink, because Redis is an external component to Flink, which won’t be checkpointed. When Flink restarts from a checkpoint/savepoint, the replayed events will be filtered out because their idempotency keys are already stored in Redis, which would cause incorrect results.

We could use Flink state to store the idempotency keys — key the DataStream by the idempotency key, then use a KeyedProcessFunction to record a boolean state for the key, finally filter the events by that boolean state.

One caveat is how to clean the state after some time period (like 7 days, equivalent to the TTL we set in Redis), otherwise the state will keep increasing with new events. Flink has a StateTtlConfig, with which you can configure a TTL for the state. By default, expired values are explicitly removed on read, and periodically garbage collected in the background if supported by the configured state backend. It’s also possible to configure more fine-grained control over some special cleanup in the background.

This blog post provides great details and code examples for the idempotency check or event deduplication!

Feature Aggregation

Flink provides various APIs to define aggregations. We explored 2 potential options to implement window aggregations.

Option1: Flink Sliding Window

Sliding windows can overlap if the slide size is smaller than the window size. In the above case, one event is assigned to 2 windows, or in other words, there are always 2 active windows.

Within each active window, since the start and end time is fixed for the window, we don’t have to store the (timestamp, value) pairs like in our in-house system, we can simply store the partially aggregated value. As a result, this option has more active windows per key, but each window is much smaller. For example, for sum aggregation, we only need to store the running sum.

Pros

  • Simpler to implement compared to option 2
  • Can benefit from new features or future optimizations in the Flink Window API

Cons

  • Feature value won’t be as fresh as our in-house solution. Freshness is limited by slide size. Granular slide size could cause scalability issues — large total state size, more active windows to update upon processing each event, and more data to checkpoint/savepoint.
  • For example, if we want a 7-day aggregation with freshness (slide size) 1 min, we’ll have 60*24*7 partial windows simultaneously stored in the states for each key. Suppose we have 1 million keys, and the state of each window consists of: key (16 bytes), timestamp (12 bytes), scalar long type value (8 bytes), the total size of the states of this feature will be over 350G!

Option 2: Process Function + Periodic Watermark

Another option is to implement the aggregation in a ProcessFunction and configure a periodic watermark to trigger the aggregation. The state of the process function will be the same as what we store in our in-house streaming feature states — the (timestamp, value) pairs for each event. Then, upon processing each watermark, we trim the stored records (only keep records with timestamp >= watermark — window_length), aggregate the (value, timestamp) pairs and sink the aggregated value.

This basically implements our in-house solution in Flink! Compared to option 1, this option has only 1 active window per key, but the window size is larger.

Pros

  • We can sink feature aggregations more frequently without increasing the state size. Thus this option can provide fresher feature values.

Cons

  • Cannot make use of potential future optimizations in the Flink Window API
  • More complicated to implement, especially if we want to handle edge cases like late events.

Which Did We Choose?

We decided to go with option 1 as we found the main downside of it (staleness) wasn’t concerning.

The main downside of option 1 is the sacrifice of feature freshness. We did some analysis on the window state density. Taking the 7-day window as an example, given the average volume of the events, suppose option 1 stores roughly similar size of states as option 2 (same as our in-house system), then we should be able to slide by every ~16.8 mins. For a 7-day window, 16.8 mins stale is quite acceptable!

States Store

We define what to store in the states in the aggregation function, Flink takes care of the storage! Out of the box, Flink bundles these state backends:

  1. HashMapStateBackend
  2. EmbeddedRocksDBStateBackend

If nothing else is configured, the system will use the HashMapStateBackend.

HashMapStateBackend

The HashMapStateBackend holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.

Pros

  • HashMapStateBackend is very fast as each state access and update operates on objects on the Java heap, no (de-)serialization.

Cons

  • State size is limited by available memory within the cluster

EmbeddedRocksDBStateBackend

The EmbeddedRocksDBStateBackend holds in-flight data in a RocksDB database that is (per default) stored in the TaskManager local data directories. Unlike storing java objects in HashMapStateBackend, data is stored as serialized byte arrays, which are mainly defined by the type serializer, resulting in key comparisons being byte-wise instead of using Java’s hashCode() and equals() methods.

Pros

  • RocksDB can scale based on available disk space. It can save a much larger state size than HashMapStateBackend.
  • It is currently the only backend that offers incremental checkpoints, which can dramatically reduce the checkpointing time.
  • Has more predictable latency without being impacted by JVM Garbage Collection

Cons

  • Each state access and update requires (de-)serialization and potentially reading from disk which leads to average performance that is an order of magnitude slower than the memory state backends.

Switching Cost

If we want to switch a state backend later:

Flink 1.13 (or above) unified the binary format of Flink’s savepoints. That means you can take a savepoint and then restore from it using a different state backend. Therefore, if you want to switch the state backend you should first upgrade your Flink version then take a savepoint, and only after that you can restore it with a different state backend.

Which Did We Choose?

Since states update latency isn’t a big concern (it causes a bit more of feature value staleness, however, the staleness mostly comes from the window slide size), we decided to go with EmbeddedRocksDBStateBackend for better scalability and the incremental checkpoints.

Sink to Online Feature Store

We use the Bahir RedisSink to store the window aggregation results to Redis. We have an online feature store service that provides API for other services to retrieve the feature values given the feature id and key value.

Sink to Offline Feature Store

Besides sinking the output Redis, we use a SideOutput to sink the output to a Kafka topic, then utilize our customized Kafka Snowflake Connector (which handles schema and schema revolution) to store the feature in Snowflake tables. Later on, data scientists could use these data points for model training.

Bootstrap

Flink allows you to specify a start timestamp to start from a past event in the queue if it’s within the retention period. When defining the KafkaSource, you can simply setStartingOffsets of it with OffsetsInitializer.committedOffsets(startingOffsets) (see code example). Analysis on our existing streaming feature window length showed that more than 80% of the features had <= 7 days window length. We decided to limit our supported window length to up to 7 days, which is the retention period of our Kafka cluster.

We proposed a few workarounds in case longer window aggregations are really needed. The bootstrap problem was also explained very well in this talk by Gregory Fee!

Hybrid Feature with Batch aggregation + Streaming aggregation

Our major argument for NOT supporting long window streaming features is that with long windows, the feature value is unlikely to be drastically changed by a few incoming events.

If both long window aggregation and fresh signals from the most recent events are needed by the model, we could implement 2 features to capture both: 1 batch feature that runs every X hours, and 1 streaming feature with short windows, like 1 day, as long as it’s longer than the batch feature interval.

Flink Hybrid Source

Flink released HybridSource in v1.15, which sequentially reads input from heterogeneous sources to produce a single input stream. The bootstrap of the long window feature could be achieved by using a HybridSource of JDBC source (read from Snowflake) and a Kafka source (live events). However, when this platform was implemented in 2023, Flink didn’t have a JDBC source for DataStream out of the box. There is this open item in the backlog: FLIP-239: Port JDBC Connector to FLIP-27&FLIP-143. If your historical events are stored in some file storage, or could be read in via Flink FileSource, HybridSource would be a great option for bootstrapping!

Increase retention period

Another simple yet not cost-effective solution is to increase the retention period of the events that we want to build long window features with. However, this is less ideal since we’d need to wait till all the needed events are retained, it could be ok for relatively small window length increases, like from 1 week to 2 weeks.

Migration

Since both our in-house system and the new Flink system are config driven, we’ve built a command line tool to auto generate part of the Flink feature definition from the legacy feature definition, which largely reduces the migration effort.

To make sure Flink generates the expected feature values, besides the unit tests, we also did correctness validation by running the legacy features and Flink features in parallel, sinking both of them to Snowflake, sampling the results and analyzing the differences.

Future work: Backfill

We haven’t implemented feature backfill in Flink. Ideally if the DataStream JDBC source is available, we could configure a JDBC source that reads historical events from Snowflake for the Flink job and sink the feature values to Snowflake. Alternative approach is to create one-off jobs to read historical events from Snowflake and publish them to a Kafka topic, then the backfill Flink job can consume from this topic.

Trade-offs

Our in-house system is far from perfect, and Flink isn’t perfect either. We’ve summarized the trade-offs in the tables below.

Pros building on Flink

Cons building on Flink

Lessons Learned

Think about Limitations & Constraints Up-front

During the evolution of our streaming feature platform, we’ve learned to be up-front about limitations and constraints, which helps us make more reasonable choices when designing the system and avoid some maintenance issues in the long run.

Long window streaming features caused huge pain to deal with. We’ve created clear guidance and constraints on the streaming feature window length, with potential workarounds to deal with long window aggregations as discussed in the Bootstrap section.

Start Simple

Special cases should be handled specially. Uncommon complicated use cases don’t justify the complexity it introduces to the platform.

When supporting the complicated chaining streaming features, our first attempt in the in-house system achieved a simple user interface. However, it largely increased the complexity of the platform and made it hard to debug and maintain. In the new Flink system, we decided to restrain the complexity of the chaining features inside its own aggregation functions, without affecting other parts of the platform.

If we see more common uses of complex features in the future, we can always make incremental changes to our platform to provide better interface and reduce the efforts of chaining features implementation. However, if we start with a complicated platform, it’s harder to simplify it later.

Try to Avoid Reinventing the Wheel

Migration is painful and time-consuming. Avoiding reinventing the wheel could avoid some migrations.

Work with Industry peers to Share Knowledge

We met with Lyft’s data platform team to understand how they were using Flink. We got a ton of insights from learning about their experiences! When adopting open source solutions, especially the ones widely used by other tech companies, it could be super helpful to partner with or learn from some peers.

Conclusion

Our journey with Flink has been transformative for our streaming feature platform. Despite some limitations, Flink’s robust capabilities have enabled us to build a scalable and efficient streaming feature framework. As we continue to explore and innovate, we look forward to sharing more insights and learnings from our journey with Flink.

Acknowledgment

Brex ML Platform team: Sebastian, Damian, Andre, Bill, Spencer, Surya, Yang

Streaming infra team: Yingying, Jun, Jim

--

--