Cache reuse across DoFn’s in Beam

Prathap Reddy
Google Cloud - Community
6 min readJan 25, 2021

Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Pipelines can be written in a variety of language-specific (Java, Python etc) SDKs and can be executed on any beam compatible runners (Dataflow, Flink, Spark etc)

In this blog post we will discuss following topics:

  1. LifeCycle of a DoFn
  2. Caching data for reuse across DoFn Instances
  3. Example illustrating below concepts:
    a) Build an in memory cache by loading data from an external source (e.g: Cloud Storage)
    b) Reuse cached objects across DoFn Instances.
    c) Refresh the cache via external trigger.

DoFn Life Cycle

A Dofn is a user defined function used with ParDo transform that gets invoked for each input element to emit zero or more elements. A bundle is a collection of elements (i.e records/messages) processed as a single unit of failure. The division of the collection into bundles is arbitrary and selected by the runner. For example, in the case of Dataflow while a streaming runner may prefer to process and commit small bundles in order to achieve low latency, a batch runner prefers to process larger bundles to reach higher throughput.

Each DoFn class has a process method invoked for each element and additional optional methods shown below to handle initialization and finalization tasks.

  • setup — Invoked after creation of an instance. Good place to initialize transient in-memory resources, such as network connections, parsing a config file etc
  • start_bundle — Invoked before processing of each bundle. Certain runners create a DoFn instance per bundle whereas others reuse the same DoFn across bundles. Can be used to initialize any state used for the entire bundle.
  • finish_bundle — Invoked after processing of each bundle
  • teardown — Invoked before instance is discarded and used for any clean up

In summary the entire flow appears as follows:

  • Setup
  • Repeatedly process bundles:
    * StartBundle
    * Repeatedly Process Element
    * FinishBundle
  • Teardown

Shared Cache

There are scenarios that require a shared in memory data to be reused across DoFn Instances to improve space and access efficiency. Few of them include:

  • Machine learning model used for local inference
  • Look up data loaded from external sources such as Cloud Storage, BigQuery etc.

As mentioned earlier, beam allows developers to author pipelines using various supported SDKs with popular choices being Java and Python.

In Java SDK, each worker launches a JVM instance with multiple threads (determined by number of cores and job type i.e batch vs streaming ). Each thread executes multiple data bundles by creating either one DoFn instance per bundle or reuse the same instance across data bundles. Shared cache can be achieved via a Singleton instance.

In Python, parallelism is achieved through multiple processes due to Global Interpreter Lock limitation. Each worker launches one process (with multiple threads) per core and executes one or more bundles invoking one or more DoFn instances per thread.

Since runners can potentially recycle DoFn instances per bundle and also has multiple such instances running in parallel on different threads how do we ensure a single copy of cached data is shared across all these threads? Did you know that Beam 2.24 has introduced a shared class to solve exactly the same challenge and allows you to manage a single instance of an object shared by multiple threads. Let us demonstrate this feature by building a Streaming data enrichment pipeline for a mock retail company.

Example: Shared cache for stream enrichment

Requirements for the pipeline are:

  • Reads sales events continuously from a real time messaging service
  • Load lookup data such as customer details, product details from a persistent store.
  • Enrich the event with additional information by associating lookup identifiers (e.g: customerid, productid) to lookup details.
  • Write the buffered enriched events to persistent storage for every x minutes.

Nothing fancy so far except for the following challenges:

  • Since look up data changes only every x hours, can data be cached & shared to improve pipeline performance?
  • How to notify the pipeline to refresh the cache as soon as new look up data arrives?

Slowly Changing Lookup Data
Before diving into pipeline design we need to understand the nature and shape of the data. In our example, look up data is stored on Google Cloud Storage (can use BigQuery as well) with the following path format:

gs://bucket/prefix/yyyy/mm/dd/customers/{file1…N}.json
gs://bucket/prefix/yyyy/mm/dd/products/{file1…N}.json

On a daily basis, a separate pipeline generates the latest lookup data and sends a message to Pub/Sub containing the latest base path (i.e gs://bucket/prefix/yyyy/mm/dd).

Schema
Let us check the schema for sales event and corresponding lookup data. Customer and product look up data schema is similar containing key value pairs spread across different files.

Sales Event: {"Txid":1,"timestamp": "2020-05-02 19:08:24.796892",  "customerid": 100, "productcode": "P023", "qty": 1, "sales": 97.65 }customers Lookup data:  {"key":100, "value": {“name”: “john”, “tier”, “premium” }products Lookup data:  {"key":"P023", "value": {“sku”: “chair”, “category”, “furniture” }Enriched Event (i.e Output): {"Txid":1, "timestamp": "2020-05-02 19:08:24.796892",  "customerid": 100, "productcode": "P023", "qty": 1, "sales": 97.65,  "customertier": "premium”, "productcategory": "furniture" }

Pipeline Design
Our pipeline has two inputs and one output:

  • Read sales events from a Google Cloud Pub/Sub Subscription as the primary input
  • Read the base path of look up data from a different Google Cloud Pub/Sub Subscription as side input. In Beam a side input is an immutable PCollection that can be used as an additional input to ParDo transform. Runners then broadcast the side input to all workers. An important note about side inputs is that they should be small enough to fit in memory.
  • The final enriched output is written to Google Cloud Storage.

Let us walk through the pipeline code step by step:

Step 1: Read the inputs

  1. Sales events are read from a sales Pub/Sub Subscription and assigned to a
    fixed window of 15 minutes duration
  2. Side Input base file path is read from a side input Pub/Sub Subscription.
    Despite projecting side input over a long window interval, the side input gets processed immediately due to a data driven trigger.

Be aware of the fact that side input expires at the end of the window and
main input would be blocked either until new side input arrives or the entire
side input window duration has passed whichever is earliest. To avoid this,
ensure the latest side input base path is passed at the expiry of each side
input window.

Step 2: Create a Shared Cache and enrich events

  1. Create a shared handle that encapsulates a weak reference to a singleton instance of a shared resource and pass it while building DoFn instance.
  2. During execution, along with each event DoFn will also receive the latest side input base path.
  3. Inside the process method , the shared handle is used to acquire the side input data and works as shown below:
  • In case of initial call, as data is not available the load_sindeput method is used to load the data from an external source. In this example, for each side input type (i.e Customers, products) the data is read from Google Cloud Storage and loaded inside a multi level dictionary with top level key indicating side input type and value representing a dictionary matching the KV pairs with corresponding lookup data.

Important point to note here is , a Shared handle requires a weak reference to the cached object but several built-in types such as list and dict do not directly support weak references. This can be addressed by creating a subclass (for e.g: _SideInputContainer in our case) of the built in type.

  • In subsequent calls, the same cached object is used instead of loading from an external source.
  • If you noticed in addition to actual look up data, we have also created a metadata key to preserve the base path. If anytime the base path changes, the cache is refreshed by passing the new base path as a tag to acquire method.

Step 3: Write enriched events to Google Cloud Storage

Pipeline DAG appears as shown below:

Conclusion

Thanks for reading and special thanks to Pablo Estrada for reviewing the article. To summarize in this blog post we covered DoFn LifeCycle, how to build a cache that is shared across threads and an approach to refresh cache via an external trigger. For more information about beam and other interesting patterns refer to beam documentation.

--

--