Efficient Data Sharing in Data Science Pipelines on Kubernetes

HE, Tao
cncf-vineyard
Published in
6 min readJul 19, 2023

Data science pipelines are evolving into complex workflows consisting of thousands of tasks and being scheduled to large-scale Kubernetes clusters. People have developed many workflow orchestration engines to handle those complex data analytical pipelines, e.g., Apache Airflow, Kedro, Kubeflow, Argo workflow, and more. The core idea of those workflow engines is organizing applications into DAGs (directed acyclic graphs) and executing tasks along with the dependency chains.

An example of complex workflow

Programming data science pipelines

The programming interfaces of those workflow engines are fairly user-friendly: defining some tasks which take input values and produce results, and finally connecting individual tasks into pipelines. That’s all.

An example from Kedro pipelines that shows defining tasks is as easy as writing ordinary Python functions:

def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.

Args:
companies: Raw data.
Returns:
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.

Args:
shuttles: Raw data.
Returns:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles


def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
"""Combines all data to create a model input table.

Args:
shuttles: Preprocessed data for shuttles.
companies: Preprocessed data for companies.
reviews: Raw data for reviews.
Returns:
model input table.

"""
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table

And pipelines are composed of individual tasks:

def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
...,
]
)

Data sharing in the pipeline execution

The task in the pipeline is the execution unit, when been submit to the local executor, distributed cluster, or even Kubernetes. The workflow orchestration engines provide an effective abstraction and programmers don’t need to take care about how data are passed between two tasks during runtime, but the intermediate data (the preprocess_companies_nodeand `preprocessed_shuttles` in the above example) do need to be transferred between tasks. The transfer can be costly and distributed settings it usually handled by remote storage like S3.

Defining the data catalog for the above example:

preprocessed_companies:
type: pandas.ParquetDataSet
filepath: s3://kedro-pipeline/data/02_intermediate/preprocessed_companies.pq

preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: s3://kedro-pipeline/data/02_intermediate/preprocessed_shuttles.pq

When data scales, the I/O cost for intermediate sharing can be dominant. The intermediate data are persistent to S3, but what we actually need is the final result model_input_table. The whole computation happens inside the cluster, but we need to send the intermediate data to the external storage service (S3) again and again, and finally purge it to save costs.

Can the end-to-end process happen within the cluster, even if we want to scale to distributed cluster?

Can we do better for intermediate data sharing?

The answer is yes. Memory is a more efficient alternative for data sharing.

Memory is a widely used approach for implicit intermediate data sharing. Imaging passing variables between different libraries, for example, between Numpy and PyTorch in Python. The process is zero-copy. However, it is not as easy to do so when we talk about intermediate data sharing in complex data science workflows, as (1) the sharing can happen between different processes and systems, and (2) the sharing may across different physical nodes where direct memory sharing won’t work anymore.

Can we use memory for data sharing between different worker processes? There’s a POSIX-compliant UNIX system call named “mmap()” which can establish a mapping between memory pieces resident in different processes.

Can we use memory for data sharing in distributed environments? “mmap()” doesn’t work between physical nodes, and we do need remote data accessing to transfer the data. But we can optimize the placement of adjacent tasks in complex pipelines to minimize the happen of remote data accessing for best performance. Thanks to the scheduler framework, it can even be implemented on the most prevalent cluster management system: Kubernetes.

Vineyard: a cloud-native data manager

Putting all things together, we build Vineyard, a cloud-native data manager, for data sharing using memory in data science pipelines on Kubernetes.

It first is an object store, with a put() and get() client interfaces, any Python object can be directly put to and get from the daemon server. Python objects are the first-class citizen and most commonly used data types in the Python ecosystem, e.g., numpy.ndarray , pandas.DataFrame , pyarrow.{Array,RecordBatch,Table} , and pytorch.DataSet are carefully integrated and the get() operation can be zero-copy, without any extra cost for I/O and (de)serialization.

At the same time, it has been tightly integrated with Kubernetes for data-aware scheduling to place the downstream task where the upstream task left its intermediate outputs wherever possible. When such data-locality constraint cannot be satisfied, an optimized migration will be triggered.

Vineyard is open-sourced and currently a CNCF sandbox project: https://github.com/v6d-io/v6d.

Vineyard in action: data sharing in Kedro pipelines

So, how to play Vineyard in action? Do we need to rewrite the code once again to adapt to the new interfaces? What’s the gain? Can we still benefit from the nice and Pythonic interfaces of these workflow engines?

Vineyard itself has been deeply integrated with commonly used workflow orchestration engines like Apache Airflow and Kedro. The plugin can be installed with a one-line command:

  • Airflow: pip install airflow-provider-vineyard
  • Kedro: pip install vineyard-kedro

Do we need to rewrite the code once again to adapt to the new interfaces?

No. You only need to run the pipeline as usual, only with some extra command arguments. Taking Kedro as an example,

  • Creating a demo project to get started:
$ kedro new --starter=pandas-iris
  • As usual, trigger an execution using the kedro run command, with the vineyard runner being used:
$ kedro run --runner vineyard.contrib.kedro.runner.SequentialRunner
[05/25/23 11:45:34] INFO Kedro project iris session.py:355
INFO Loading data from 'example_iris_data' (CSVDataSet)... data_catalog.py:343
INFO Loading data from 'parameters' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split: split_data([example_iris_data,parameters]) -> [X_train,X_test,y_train,y_test] node.py:329
INFO Saving data to 'X_train' (VineyardDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (VineyardDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (VineyardDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (VineyardDataSet)... data_catalog.py:382
INFO Loading data from 'X_train' (VineyardDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (VineyardDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (VineyardDataSet)... data_catalog.py:343
INFO Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred] node.py:329
...

That’s all.

What’s the gain?

The gain is significant and becomes more impressive when the data scales:

Data sharing using Vineyard vs. S3 vs. Minio

Can we still benefit from the nice and Pythonic interfaces of these workflow engines?

The task and pipeline definition don’t need any modification to leverage Vineyard for sharing, thanks to the careful integration.

What’s more, the pipeline can be directly pushed to Kubernetes and still leverage Vineyard for data sharing, without any modification, again. The pipeline will be translated to Argo workflow, and the Vineyard scheduler plugin will help to manage the placement of tasks to maximize memory-sharing opportunities and boost end-to-end performance.

From the programmer’s perspective, you still write Python, using tensors, data frames, and datasets, and these intermediate will be shared in the most efficient way automatically.

Let’s take a try!

For more details to get started, refer to Apache Airflow and Kedo Pipelines.

Em... More technical detail?

How complex objects can be shared using memory mapping?

You might be curious about how efficient get() and put() are implemented for complex (especially nested) and even distributed objects in Vineyard, please refer to our documentation and don't hesitate to throw questions on Github!

Wait, the support of shared memory in Kubernetes is not quite well.

It is not so easy and straightforward to play shared memory right on Kubernetes, especially when considering the interoperability of shared memory with Kubernetes’s resource allocation and measurement. We will discuss more details under the hood in the next blog!

We build are pipelines using Spark, can Vineyard used for data sharing in the Hadoop ecosystem?

We are working with integration with Hive and Spark to allow (1) data from upstream tasks can be directly analyzed using Spark, and (2) the result produced by Spark can be directly consumed by downstream tasks implemented in third-party computation systems.

--

--

HE, Tao
cncf-vineyard

Open-source enthusiast, focusing on PyData, cloud-native, data-intensive analytics, and graph computing. Maintainer of Vineyard and GraphScope.