Processing Large Files through Unix pipeline in AWS Lambda Function
Patrick Yee and Allie Sanzi, Software Engineers at 23andMe
Summary
This article illustrates how to architect an AWS Lambda function, written in Python, to stream input data from an S3 object, pipe the data stream through an external program, and then pipe the output stream to an object in S3.
Introduction
AWS Lambda function is a handy serverless computing service for handling unit tasks. However, it has limitations that make it impossible to fit large input and/or output files into its memory or temporary file storage:
- Maximum memory allocated to the runtime is 3GB
- Maximum temporary file storage is 512MB
- And it is unencrypted, which does not meet our data security standards
There are numerous blog posts [1][2][3] that talk about utilizing the S3 streaming body API to ingest large input files from S3 without exhausting the memory allocated to the Python runtime of the Lambda function. Those techniques are useful if the processing can be done purely in Python. Nevertheless, in some cases, we need to invoke external programs for processing the streamed data. Examples include:
- encryption and decryption
- compression and decompression
- custom codecs, etc.
In those use cases, ideally we can construct a pipeline to stream input data from an S3 object, and then pipe the data stream through an external program, and finally stream the program’s output to another object in S3. As the data is streamed through the pipeline, we do not need to store the whole input (and output) data in memory or temporary file storage.
Implementation
We illustrate the idea with an AWS Lambda function, written in Python, for decrypting large PGP-encrypted files in S3. As there is no pure Python library for PGP decryption, we must use subprocess
to invoke an external gpg
program.
First, we obtain the input data stream using boto3
:
import boto3
s3 = boto3.client(‘s3’)
response = s3.get_object(Bucket=bucket, Key=input_file_key)
in_stream = response[‘Body’]
Next, we set up a subprocess to invoke the gpg
program, and prepare pipes to feed data in and get data out:
import subprocess
bufsize = 1024 * 1024 # 1 MiB
proc = subprocess.Popen([‘gpg’, ‘ — decrypt’, ‘ — homedir’, homedir,
‘ — batch’, ‘ — passphrase’, passphrase],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
bufsize=bufsize)
Then, we need to prepare for uploading the output data stream to S3. Here comes a small problem. Almost all S3 APIs for uploading objects expect that we know the size of the file to be uploaded ahead of time. However, we don’t, because we are processing a data pipeline. Luckily, there is an exception: upload_fileobj
, which expects to take in a file-like object. For that purpose, we create a named pipe in the file system for providing such file object:
import os
named_pipe = ‘/tmp/name_pipe_1’
os.mkfifo(named_pipe)
Now, we are ready to join the pipes together:
def from_input_to_gpg():
data = in_stream.read(bufsize)
while data:
proc.stdin.write(data)
data = in_stream.read(bufsize)
# important to close proc.stdin to signal the end of input
proc.stdin.close()def from_gpg_to_named_pipe():
with open(named_pipe, ‘wb’) as wf:
data = proc.stdout.read(bufsize)
while data:
wf.write(data)
data = proc.stdout.read(bufsize)def from_named_pipe_to_output():
with open(named_pipe, ‘rb’) as rf:
s3.upload_fileobj(rf, bucket, output_file_key)
The fun part is to get them to move the data simultaneously:
import threading
threads = [
threading.Thread(target=from_input_to_gpg),
threading.Thread(target=from_gpg_to_named_pipe),
threading.Thread(target=from_named_pipe_to_output),
]for t in threads:
t.start()# when they all finish, clean up and close the external program
for t in threads:
t.join()os.unlink(named_pipe)
proc.terminate()
Performance
In our test, we decrypted a 3.6GB text file using a Python 3.7 runtime configured with 256MB memory. It took around 10 minutes.
Summary
This example can easily be extended to pipe the data through a series of external programs to form a Unix pipeline. In summary, with the help of subprocess
and threading
, we can simultaneously:
- read the data stream from an S3 object
- pipe the data stream to a pipeline of external programs
- write the output data stream from the pipeline to another S3 object
About the Authors
Patrick Yee is a software engineer at 23andMe who works on genetics platforms. He is passionate about performance optimization on software systems.
Allie Sanzi is a software engineer at 23andMe who works on genetics platforms. She is excited about helping people understand their genetics.
23andMe is hiring! Check out our current openings!