Distributed Processing Use Case — Using Multi-instance SageMaker Processing Jobs

Dipak Singha
Thomson Reuters Labs
8 min readDec 2, 2022

Singha, Dipak (TR Technology)

Introduction

Data processing is a critical step in any Machine Learning project and is one of the most resource-intensive steps in the ML workflows. Being able to reliably and efficiently process data is key to delivering quality models.

Data processing involving a small dataset or simple transformations is relatively straight forward, and a simple notebook environment should be sufficient to manage the processing lifecycle.

At TR Labs, we are generally dealing with large legal datasets which require complex transformations for downstream usage, so SageMaker processing jobs become a natural choice as it provides a fully managed environment that is flexible and scalable to suit our time complexity and resource requirements.

In the following sections, we will discuss a specific use case where multi-instance processing jobs are required to extract and transform the data from the source.

Input Data Organization

We will be illustrating a scenario where the source data consists of ~15 million documents stored in ~1500 parquet archives and organized in a specific set of 50 collections. The collections vary among themselves considerably in terms of the number of files as well as their total sizes. The transformed data is also expected to be organized according to the same set of collections. A sample of the source dataset organized into collections is shown below:

Sample storage structure of the source dataset on S3

The distribution of the collections in terms of the number and size of the data files is shown below:

Distribution of the source dataset collections stored on S3

In most cases, for a multi-instance job, we can shard the input data using the s3_data_distribution_type field in the job definitions which will automatically divide the dataset into as many shards as the number of instances and assign a specific instance for each of these shards. However, this default technique does not consider the variations in the sizes of the files as well as their organization within the collections.

Therefore, in our case, the default shards will be imbalanced in their total sizes and each of the collections may get distributed across multiple shards. Naturally, any task to extract and aggregate relations existing within the same collection will be very difficult to accomplish.

The split up of a particular collection across multiple instances can still work if we can process and store output for each file within the collection independently and do not need to do aggregations and joins on the data. But it will still be sub-optimal as the default technique will not account for the data volume fed to the instances.​​​​​​​

Output Data Organization

Our use case requires that the output data is stored in the same collection-based structure as the source data. This can be accomplished in two ways which are illustrated below:

Scenario 1

The output files are aggregated by collections as shown below:

Target storage structure of the processed output on S3 — Type 1

Scenario 2

The output files are not aggregated, and each output file corresponds to its source counterpart as shown below:

Target storage structure of the processed output on S3 — Type 2

Solution Approaches

The approaches that lead to the different scenarios listed earlier generally rely on the same sequence of steps with minor differences. Our use case involves several different transformations of the data and so we split up the functional logic into a set of processing steps forming a SageMaker Pipeline. However, we also add a separate processing step before the real transformations to catalog the dataset based on the number of instances required in the next step. The catalogs are created in such a way that each list of files has approximately equivalent data size. The differences between the solution approaches are explained below:

Scenario 1

As the output files are aggregated by collections, this is useful when the data generated by the processing is not very large and a moderate scale-up of the processing job satisfies the time complexity requirements. This method reduces the complexity of the downstream processing as aggregations are no longer required. But the downstream steps cannot be optimized in a multi-instance job based on the size or resource requirements of the aggregate data. Therefore, this scenario is suitable when the processing of each dataset is less resource intensive.

When we create the catalogs, there is a constraint that disallows splitting a collection across multiple lists. So, the number of groups is strictly limited by the number of collections and the skewness in the cumulative sizes of the collections further limits the number of groups as the distribution will be sub-optimal if we have as many groups as collections.

As we have 50 collections, to achieve optimum distribution, we create 5–10 catalogs. In the subsequent step where the actual processing of the source documents takes place, we need 5–10 instances with sufficiently large memory (ml.m5.xlarge or larger) as the output from all the datasets processed by each instance will be aggregated as well.

The general sequence of the steps in this scenario is shown below:

Data processing pipeline — Scenario 1

Scenario 2

As the output files are not aggregated, this is more useful when the processed output is quite large and the processing job needs to be scaled up significantly, so the aggregation step, if any, is pushed to the downstream steps. This also allows independent and flexible scaling for different steps in a processing pipeline. This scenario is suitable when the processing of each dataset is highly resource-intensive.

In this scenario, splitting of collections is allowed while creating the catalogs, and the number of groups is limited by the number of datasets which is quite large (more than 1500), and practically by the number of instances that we can use.

We create 50–100 catalogs that meets the time complexity requirements. As there is no requirement to aggregate outputs, we need 50–100 small instances (ml.t3.medium or ml.t3.large) in the subsequent step and that will be much faster than scenario 1 even with much more intense processing.

The general sequence of the steps in this scenario is shown below:

Data processing pipeline — Scenario 2

Here in TR Labs, as we process large legal datasets spanning millions of documents, like in this particular use case, we implement and use the approach from scenario 2 which is discussed in more detail with respect to the actual implementation.

Implementation Details — Scenario 2

We opted to build and use our own containers for the processing instances and custom processing scripts to accomplish the actual transformations.

Our “catalog_datasets” defined as the first step in the SageMaker pipeline looks like

catalog_datasets_processor = sagemaker_factory.create_processor(
Processor,
base_job_name=f"{<naming_prefix>}-catalog_datasets",
image_uri=<image_uri>,
volume_size_in_gb=16,
instance_count=1,
instance_type=ml.t3.medium,
entrypoint=["python", "catalog_datasets.py"],
)

step_catalog_datasets = ProcessingStep(
name="catalog_datasets",
processor=catalog_datasets_processor,
outputs=[
ProcessingOutput(
output_name="us_cases_catalogs",
destination=f"s3://{<s3_bucket>}/{<location_prefix>}/catalogs/",
source="/opt/ml/processing/output/catalogs/",
),
],
job_arguments=[
"--max_catalogs",
f"{<num_shards>}",
"--date_stamp",
<source_datestamp>,
"--split_collections",
f"{int(<allow_splits>)}",
],
cache_config=CacheConfig(enable_caching=<cache_flag>, expire_after="p7d"),
)

Specifically, this step generates and stores a set of manifest files onto S3 from where each of these files serves as input to the subsequent processing jobs whether multi-instance or otherwise. Manifest files are used as they have a standard format where the prefix value specifies the common prefix for the list of file locations within the manifest and the relative paths specify the list of relative paths to all the individual files.

Some downstream steps depend on the catalog manifest for identifying the files to be processed by a particular instance doing that step. So, we need to pass the location of the manifest files explicitly parsed inside the job entry-point script for that step. In each of the instances, the list of datasets to be processed is identified from the manifest file and each dataset is downloaded using the boto3 interface, processed, and then deleted to keep the resource usage at a minimum.

For example, in our case, the step “extract_citations” immediately following the catalog preparation looks like

extract_citations_processor = sagemaker_factory.create_processor(
Processor,
base_job_name=f"{<naming_prefix>}-extract_citations",
image_uri=<image_uri>,
volume_size_in_gb=16,
instance_count=<num_shards>,
instance_type=ml.t3.large,
entrypoint=["python", "extract_citations.py"],
)
step_extract_citations = ProcessingStep(
name="extract_citations",
processor=extract_citations_processor,
inputs=[
ProcessingInput(
input_name="us_cases_catalogs",
source=step_catalog_datasets.properties.ProcessingOutputConfig.Outputs[
"us_cases_catalogs"
].S3Output.S3Uri,
destination="/opt/ml/processing/input/catalogs/",
s3_data_distribution_type="ShardedByS3Key",
s3_data_type="S3Prefix",
s3_input_mode="File",
),
],
outputs=[
ProcessingOutput(
output_name="us_cases_citations",
destination=f"s3://{<s3_bucket>}/{<location_prefix>}/extracts/",
source="/opt/ml/processing/output/extracts/",
),
ProcessingOutput(
output_name="us_cases_indices",
destination=f"s3://{<s3_bucket>}/{<location_prefix>}/indices/",
source="/opt/ml/processing/output/indices/",
),
],
cache_config=CacheConfig(enable_caching=<cache_flag>, expire_after="p7d"),
)

The parsing of the manifest files within the processing instances is required as we are passing the location of the catalogs as S3Prefix and not as ManifestFile as the processing instance needs to be aware of the collection set of the source data. The helper function to parse the manifest file to generate the global prefix and the dictionary of file paths per collection looks like

def parse_manifest(single_manifest: str) -> Tuple[str, Dict]:
"""Reads the contents of a single manifest file.

Args:
single_manifest: A str representing the name of a manifest file.

Returns:
A Tuple of a str and a List. The str contains the global prefix in the manifest file.
The List contains the relative paths of the individual data files in the manifest file.
"""
with open(single_manifest, encoding="utf-8") as f:
metadata = eval(f.read())
common_prefix = "/".join(metadata[0]["prefix"].strip("/").split("//")[1].split("/")[1:])
relative_paths = sorted(metadata[1:])

collections: dict = {}
for datapath in relative_paths:
subdir, file = os.path.split(datapath)
if subdir not in collections.keys():
collections[subdir] = [file]
else:
collections[subdir].append(file)
return common_prefix, collections

Our processing pipeline involves several steps, each scaled independently to optimize resource requirements. A snapshot of the pipeline is given below with a summary of the tasks accomplished at each individual step.

Summary of tasks performed at each individual step in the pipeline

Additional Considerations​​​​​​​

While scaling up the instances, we need to ensure the availability of enough IPs on the private subnets as AWS only checks if there is general availability of instances and will raise an error only at runtime.

Using multi-processing can make better use of the resources in the processing instances and eventually rationalize the number of instances required or even the types of instances required. The choice of multi-processing parameters can be tricky and can require a lot of iterations to achieve optimum configurations. This is important as AWS provides only discrete resource configurations for processing instances. Also, it can be difficult to identify if memory leaks occur in certain configurations and can be counter-productive in business-critical steps.

Finally, it also makes sense to divide the complete processing logic into smaller steps, where each step can be independently scaled according to the resource requirements. A larger number of small instances (suitable for scenario 2 explained above) can be more time efficient while keeping the costs largely unimpacted.

Conclusion​​​​​​​

We at TR Labs leverage AWS SageMaker infrastructure to build optimized data processing pipelines that suit both business and performance requirements. The approaches discussed in the previous sections illustrate the advantages and limitations that we need to weigh when processing data at scale. We have identified non-aggregate data processing to have distinct advantages in that it is very useful for resource-intensive transformations by way of being highly scalable and allowing independent and flexible scaling while requiring less expensive resources.

--

--