How to concatenate sharded files on Google Cloud Storage automatically using Cloud Functions

In Apache Beam, when you write out a text file to a blob store using:

beam.io.WriteToText('gs://somebucket/somedir/csv/train')

You will get a number of output shards, like this:

gs://somebucket/somedir/csv/train-00214-of-00262
gs://somebucket/somedir/csv/train-00215-of-00262
gs://somebucket/somedir/csv/train-00216-of-00262

The exact number depends on your dataset, number of workers, etc. Yet, many times, you will want exactly one output file because the software you are using wants just one file.

The easy way out is to tell Beam to write out only one shard:

beam.io.WriteToText(options.output_prefix, num_shards=1)

However, this is very inefficient. The power of a distributed system comes in being able to completely parallelize the work and having a single sink will slow down your Beam pipeline.

I hope the theory is right, but I have my example in GitHub now, so who knows? (cartoon by xkcd)

I recently faced this problem, and solved it using Cloud Functions (code in GitHub).

Write to shards

In your Beam pipeline, specify a number of shards. The “compose” feature of Google Cloud Storage that we are going to use has a limit of 32 files currently, so you don’t want your pipeline producing more than that. At the same time, for efficiency sake, you’d like to have more shards than the number of workers in your Dataflow job:

beam.io.WriteToText(options.output_prefix, num_shards=10)

When this is done, you will have a bunch of files. How do you concatenate them?

Cloud Storage Compose

The naive way to concatenate a bunch of files on GCS is to download them to a VM, concatenate them using Unix `cat` and upload them. Don’t do that!

Google Cloud Storage supports a nifty feature called “compose”: it lets you compose a blob out of up to 32 source blobs. You can do this from the command line by doing:

gsutil compose \
gs://${BUCKET}/somedir/csv/train* \
gs://${BUCKET}/somedir/csv/full_training_data.csv

No downloading or local storage needed! You can also delete the shards after the compose to avoid clutter. So, an efficient way to create single output file is to run the Dataflow job, tell it to produce up to 32 shards, and then concatenate the output using the compose functionality.

Well, you could run the Dataflow job, wait for it to finish and then invoke the compose command. But why do that when you can set up a Cloud Function to do it for you automatically whenever the sharded files show up in the bucket?

Cloud Function to do the compose

I went to the GCP web console, navigated to the Cloud Function console, created a function to trigger on Create events in my bucket, chose Python 3.7 as my preferred language and then typed in the following Python function in main.py:

import google.cloud.storage.client as gcs
import logging
def compose_shards(data, context):
num_shards = 10
prefix = 'somedir/csv/train'
outfile = 'somedir/csv/full_training_data.csv'
# trigger on the last file only
filename = data['name']
  last_shard = '-%05d-of-%05d' % (num_shards - 1, num_shards)
if (prefix in filename and last_shard in filename):
# verify that all 10 shards exist
prefix = filename.replace(last_shard, '')
client = gcs.Client()
bucket = client.bucket(data['bucket'])
blobs = []
for shard in range(num_shards):
sfile = '%s-%05d-of-%05d' % (prefix, shard + 1, num_shards)
blob = bucket.blob(sfile)
if not blob.exists():
# this causes a retry in 60s
raise ValueError('Shard {} not present'.format(sfile))
blobs.append(blob)
# all shards exist, so compose
bucket.blob(outfile).compose(blobs)
logging.info('Successfully created {}'.format(outfile))
for blob in blobs:
blob.delete()
logging.info('Deleted {} shards'.format(len(blobs)))

I also checked the box that said ‘retry’ on failure and specified the `google-cloud-storage` package in requirements.txt.

What does the code above do? It verifies that it was triggered on a file that has the prefix my Dataflow pipeline was writing to and that this is the last shard.

Now, the last shard is not necessarily the last one to get written out. There could be a race condition, or one of the earlier files could be very large. So, I do a bit of defensive programming and make sure that all 10 shards that I expect exist. I can do that by constructing a Blob with the expected filename and check that it exists. If it doesn’t exist, I simply throw an exception. Recall that we set up the function so that it will be retried in 60s if this is the case.

Once I verify that all the shards exist, I can use the API call to compose the blobs into one large blob, and delete the individual blobs.

Tea time

So, now, when I run my Dataflow pipeline, it produces 10 blobs. As soon as the 10th blob hits Google Cloud Storage, the Cloud Function runs and concatenates all 10 blobs into a single one.

Me? I can go get tea while the job runs and the function watches the job.