Dataset Streaming for ML Pipelines with SceneBox

Pratik Gujjar
SceneBox
Published in
8 min readNov 17, 2022

TL;DR: SceneBox users can now utilize StreamableSets to stream curated datasets directly from SceneBox cloud to their PyTorch training pipelines — blazingly fast!

Machine learning algorithms have become ubiquitous in most modern technology products. Every day, a heap of these algorithms are churned. In order to keep this ML beast fed, data is collected in the order of petabytes. It is in this area that SceneBox thrives, with features such as high-throughput indexing of any schema-less data and powerful data curation tools.

As of today, we are proud to present StreamableSets for multi-worker batched loading of data from any remote storage to feed your PyTorch jobs. StreamableSets is a drop-in replacement for dataloader objects in PyTorch scripts. A few lines of code and SceneBox will seamlessly integrate with your pre-existing training pipelines. When paired with our responsive WebApp, StreamableSets becomes a powerful tool in your arsenal.

Below, we will explore how StreamableSets ties into your everyday life as a machine learning engineer, and helps you learn about your data before your ML model does.

Iteration is Key for Machine Learning

A typical machine-learning workflow consists of four steps:

  1. Data Curation
  2. ML algorithm training and validation
  3. Field-testing / Performance metrics
  4. Deployment
Source

We iterate on these steps, experimenting with various datasets and algorithms, and addressing performance gaps in the field.

Although we could get granular very quickly, for the sake of this article, we will only expand on step one — Data Curation. Step one can be further split into very involved processes that entire teams of data engineers work on painstakingly with all but their python scripts. These are the steps we find teams taking all too frequently:

  1. Indexing data: Raw data (images, videos, lidars) is usually stored in an object storage and metadata in some variant of a database.
    SceneBox method: Our high-throughput index can ingest your data at speeds averaging 4000 images per minute and all the while, the raw data never leaves your object storage.
  2. Data Cleanup / Annotation: Imagine several rounds of back and forth between data scientists and your various annotators for labels and QA.
    SceneBox method: Full-featured integrations with many leading annotation providers such as Scale, CVAT, and Segments, that take care of these workflows with the click of a button.
  3. Curating Datasets: Usually a very manual process of filtering the data for the required attributes and is often done by visually inspecting raw data directly. This could result in several copies of the same data with every engineer curating their own sets for their algorithms.
    SceneBox method: Our Sets are a logical association of data within our data index. We can sample this data using any combination of queries for metadata attributes available on the data.
  4. Fetch and feed the ML model: As machine learning practitioners, we need to feed our algorithms with the data we just curated. We’ve seen our customers download their data from cloud storage and then write a version of a Data Iterator to yield this data. Data Iterators support batching and also apply input and label transforms to each sample. Such Iterators tend to be custom-developed for every single dataset that they are working on.
    SceneBox method: StreamableSets!

Streaming Data from Cloud Storage

Sourcing data from the same remote storage to feed machine learning pipelines has many advantages. For example, it prevents redundant copies of data that many engineers will inevitably end up creating as they run their experiments. A single, well-built data orchestrator code can be shared amongst the team to significantly improve time utilization. Further, we can now pack more of these streaming services into fewer computational resources resulting in improved cost efficiency.

A typical scenario of downloading data from object storage to feed a machine learning job.
With SceneBox, datasets can be streamed directly from cloud storage to compute.

All this said, cloud streaming of data comes with its fair share of difficulties. The first (and possibly the biggest) concern is the network overhead and its implications on throughput on the model training side. There are also issues with shuffling data and secondary transformations that are omnipresent in machine learning pipelines.

With StreamableSets, we venture to address these challenges with a cloud-agnostic API. Any SceneBox Set can be transformed into a StreamableSet on-demand. This ensures a one-to-one relationship between the data in the set and its streams. When a request for a StreamableSet arrives, we begin condensing all of the data in a compressed format. One or many shards of such archives are created in parallel in the background. We then use WebDatasets to build a PyTorch IterableDataset object that grants us streaming data access to the many shards. To build a dataloader, we finally wrap the IterableDataset with our stream decoding logic and provide an API that directly corresponds to the torch.utils.data.Dataloader interface.

Timing diagram to illustrate the process of streaming a SceneBox Set

We’ve built in it the following features:

  1. Sequential I/O: We compress and build sequential shards of data for efficient network access of the samples as compared to regular file I/O operations. The capability to shuffle data is preserved by shuffling fetched data in a size-defined buffer and optionally shuffling between shards themselves.
  2. High Throughput Pipelines: StreamableSets iterates over compressed archives of data, providing very efficient access that scales to extremely large datasets. It also provides a PyTorch dataloader object (will be extended to TensorFlow in later versions) that can split data fetch jobs across multiple parallel workers battling any network overhead.
  3. Distributed Shards: Data is split into multiple shards that can themselves be distributed across multiple nodes on the cloud, thus paving the way for multi-node streaming. Partitioned sharding also means that very large datasets can be created, maintained and distributed for training or validation.
  4. Input and Target Transforms: Input and label tranforms are an integral part of an ML training / validation job. Any PyTorch transform can be applied to the data from a StreamableSet the same way a user would on their DataIterator object in PyTorch — passing these transforms as arguments to the dataloader constructor.
  5. Ubiquitous Operational Access: The intuitive SceneBox WebApp enables data filtering, dashboarding, enrichments and visualizations. Coupled with SceneBox’s curation tools, access to even niche subsets of data like “All bears in the backyard that are sitting down” is now possible directly from your PyTorch script.

Usage

In the example below, we’ll see how to source CelebA data that’s hosted on SceneBox in a PyTorch training loop. Specifically we will show how to use a dataloader for the set StreamingSet, curated with a 1000 images in it. A few lines of code added in place of a torch.utils.data.Dataloader and we are set! SceneBox itself can be installed with a simple pip command:

pip install scenebox
"""
Streamable Sets
"""

from scenebox.clients import SceneEngineClient
import torch
import time

# Instantiate the client
sec = SceneEngineClient(auth_token="d1390598-a1e4-82d9-6922-1780_53ce_49")

# Set id on SceneBox
celeba_image_set_id = "streamingset"

# Create a streamable set
streamable_set = sec.get_streamable_set(set_id=celeba_image_set_id)

# Get the pytorch dataloader for this set
dataloader = streamable_set.pytorch_dataloader(num_workers=4,
batch_size=32,
shuffle=True)

# Training loop
start_time = time.time()
for i, data in enumerate(dataloader):
images_batch, metadata_batch, annotations_batch = data

# Your pytorch code here

end_time = time.time()
time_taken = end_time - start_time
num_batches = i + 1
print("Time per batch: {} ms".format((time_taken * 1000) / num_batches))

Let us compare streaming data using SceneBox to other typical random file-access patterns such as:

  1. Sequential GET: Reading images individually from the AWS S3 URLs in a sequence.
"""
Sequential GET
"""

import time
import cv2
import requests
import urllib.request
import numpy as np

from scenebox.clients import SceneEngineClient
from scenebox.constants import AssetsConstants

# Instantiate the client
sec = SceneEngineClient(auth_token="d1390598-a1e4-82d9-6922-1780_53ce_49")

# Set id on SceneBox
celeba_image_set_id = "streamingset"

# Collect asset ids in the set
set_asset_ids = sec.assets_in_set(set_id=celeba_image_set_id)

# Split into batches
batch_size = 32
batched_ids = [set_asset_ids[i:i + batch_size]
for i in range(0, len(set_asset_ids), batch_size)]

# Image asset manager on SceneBox
images_amc = sec.get_asset_manager(asset_type=AssetsConstants.IMAGES_ASSET_ID)

batch_times = []
for batch_idx, batch in enumerate(batched_ids):
# Get signed s3 urls for the images in this batch
image_id__urls_map = images_amc.get_url_in_batch(ids=batch)

images_batch = {}
# Fetch each of them in sequence
start_time = time.time()
for image_id, image_url in image_id__urls_map.items():
resp = urllib.request.urlopen(image_url)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED)
images_batch[image_id] = image

end_time = time.time()
time_taken = end_time - start_time

print("Batch {} Time {}".format(batch_idx, time_taken))
batch_times.append(time_taken)

single_fetch_time = sum(batch_times)/len(batch_times)

print("Average single fetch batch time {} ms".format(single_fetch_time*1000))

2. Sequential Threaded GET: Reading a batch of images using concurrent HTTP requests.

"""
Threaded GET
"""

import time
import cv2
import requests
import urllib.request
import numpy as np

from scenebox.clients import SceneEngineClient
from concurrent.futures.thread import ThreadPoolExecutor

# Instantiate the client
sec = SceneEngineClient(auth_token="d1390598-a1e4-82d9-6922-1780_53ce_49")

# Set id on SceneBox
celeba_image_set_id = "streamingset"

# Collect asset ids in the set
set_asset_ids = sec.assets_in_set(set_id=celeba_image_set_id)

# Split into batches
batch_size = 32
batched_ids = [set_asset_ids[i:i + batch_size]
for i in range(0, len(set_asset_ids), batch_size)]

# Image asset manager on SceneBox
images_amc = sec.get_asset_manager(asset_type=AssetsConstants.IMAGES_ASSET_ID)

batch_times = []
for batch_idx, batch in enumerate(batched_ids):
# Get signed s3 urls for the images in this batch
image_id__urls_map = images_amc.get_url_in_batch(ids=batch)
images_batch = {}

def fetch_image(image_id, image_url):
resp = urllib.request.urlopen(image_url)
image = np.asarray(bytearray(resp.read()), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED)
images_batch[image_id] = image

start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
for image_id, image_url in image_id__urls_map.items():
executor.submit(fetch_image,
image_id=image_id,
image_url=image_url)
end_time = time.time()
time_taken = end_time - start_time

print("Batch {} Time {}".format(batch_idx, time_taken))
batch_times.append(time_taken)

threaded_time = sum(batch_times)/len(batch_times)
print("Average threaded fetch batch time {} ms".format(threaded_time*1000))

The table below compares StreamableSets with two the above methods of fetching data from the cloud:

With an average time of 61 milliseconds to fetch a batch of 32 images, StreamableSets is 16x faster than threading and 67x faster than sequential HTTP requests. Results are collected from a machine with an AMD Ryzen 9 3900X 12-Core Processor.

Conclusion

StreamableSets is the latest addition from us here at SceneBox, to your repertoire of machine learning accelerator tools. Users can now use high-performance data streaming to feed their machine learning pipelines directly from SceneBox with almost no change to existing code. We are also working on many new product features that are directed towards realizing the goal of building and enabling Software That Learns.

I would love to hear your thoughts on this article. Our team at SceneBox is always open to conversations. Please reach out to me on LinkedIn if you would like to take a deeper dive with me.

--

--