Cloud-scale file transfer
Using NodeJS, Redis, Docker and AWS Lambda
Recently, I had an opportunity to build a system for transferring large numbers of files from one cloud Content Delivery Network onto another. This post describes the key components and the overall architecture.
During the past year, we’ve amassed a sizable video library. Copying that library from one CDN to another would be a time-consuming operation — but one that would eventually complete — even if it took a few days to transfer. However, more important than a one time transfer is the fact that an ideal solution would need to continually transfer content after each newly recorded class. This is where transfer time equates to fresh content availability.
I wish I could say that the transfer operation was simply a matter of invoking an SCP/SFTP file transfer. But there was no such luck. Files in the primary CDN are stored in an object storage container (think S3) and there’s no direct access to a directory structure per se — just a flat bucket with path-like filenames.
Additionally, each video is stored as an HLS stream — a live streaming format consisting of multiple manifest files and lots of individual audio and video segments stored as files. A typical 30-minute video can consist of thousands of segments due to the number of multi-channel video streams required to support adaptive streaming.
So the task at hand required reliably transferring an entire collection of videos with millions of individual file segments. Terabytes of data, in essence, a highly-scalable file sync operation.
Naturally, it’s the “reliably transferring” phrase above that requires special attention. File transfers fail due to broken and or timed out network connections. A blind copy just won’t do. Each file needs to be properly tracked and accounted for.
The source files that needed to be copied are already in the cloud and the destination is AWS S3 for use with CloudFront. Both are ideal scenarios for servers which are also already in the cloud. My first consideration was simply to use a Docker Swarm cluster to manage a collection of microservices. Such an approach would rely on multiple container replicas to ensure distribution across available machine CPU cores. Then I thought, “Why not try lambdas?” After all, file transfer tasks are what’s known in computer science as an embarrassingly parallel problem - given that each file can be individually transferred.
AWS Lambda seemed like an ideal solution to perform the transfer at scale. Lambda’s offer scalable compute resources without the need for server provisioning and maintenance. Enter the so-called server-less revolution.
Quickly thereafter the Dopamine rush hit… whaaaat? I could have thousands of machines simultaneously moving files? Sign-me-up!
Using Lambdas, the process behaves like a swarm of ants carrying objects back to an ant colony. And given that the files in question originate from our Flywheel Live production studio — I decided on the name FLAnts (FlyLive Ants). Cheesy — I know! Fortunately, I’m not in marketing…
Let’s take a closer look at the underlying process. The first order of business required determining which content needed to be copied. A process called the
class-scanner pulls class metadata from our cloud MongoDB and performs basic validation on each class. Because not all classes stored in the database are actually broadcasted. Some are test classes, others are aborted classes — imagine a director yelling “cut!”.
For every validated class, the class-scanner pushes a job onto a Redis job queue.
A second process is the
class-crawler. The process gets its name from the similar process of web crawling, an underpinning of Internet search engines. The class-crawler, pops a job from the Redis queue and performs the task of reading each classes HLS manifest file and crawling it to collect a list of individual HLS segment file names. Each segment’s metadata is used to populate another job queue which identifies each file that needs to be copied. Each resulting job can be distributed to one of thousands of Lambda instances. It’s also worth noting that the class-crawler doesn’t have to be a single process. On a large scale multi-core machine we fire off one per CPU core. And each pulls atomically from the job queue. Amazon has sports car-class machines featuring 72 and 128 cores. It is a great time to be in tech!
A third process called,
segment-transfer, pops transfer jobs from the Redis queue and dispatches them to lambda instances. The role of the segment-transfer process is to act as an orchestrator or traffic cop of sorts. Also like the class-crawler, the segment-transfer process can run as multiple processes for all available CPU cores — across multiple machines.
Our forth and last process is called the
aggregator. Its role is to process completed jobs and assemble them based on their respective classes. When all of the segments of a class have been processed, the transfer operation is deemed successful.
The diagram above shows the four process types which can either run on a large multi-core machine or across a number of EC2 instances. Each connects to a Redis instance for job queuing. The
segment-transfer process is shown dispatching lambdas which in turn copy data from one CDN to another.
The reason for this overall approach is that any one of the four process types can fail and require a restart. That was especially true during the development phase and is a good example of designing software with failure in mind. That’s something that’s vital when building distributed systems.
Message Queuing FTW
As described above, all four processes have a very important feature in common: each relies extensively on message queuing. The class-scanner queues messages, the class-crawler, and segment-transfer both dequeue and queue messages. And the aggregator dequeues messages and generates a transfer report.
A key reason for using message queuing is to control the flow of work between processes. Each process is blazing fast and it’s not uncommon to max CPU usage and blow through both memory and socket handles. It’s far better to queue messages and allow processes to consume them as fast as they can — but not faster!
Another significant benefit is that queuing allows for multiple instances of a particular task to participate in the message processing.
Additionally, queuing and the use of multiple process instances allows NodeJS-based applications to take full advantage of multicore machines.
Queues also assist in decoupling applications. In the illustration above each service type can have instances on the same and across networked machines; so long as the queuing service is accessible.
I chose to implement my processes using NodeJS and Hydra. Hydra has built-in message queuing which sits on top of Redis. So implementing the above queuing couldn’t be easier. Here’s an example from the segment-transfer process.
I only need to specify which service receives the job (using the
to field) and include a
bdy (body) object which contains the actual job details.
The aggregator service only needs to dequeue messages when it’s ready:
let message = await this.hydra.getQueuedMessage('aggregator-svcs');
The aggregator is responsible for dequeuing jobs messages which contain the result of each lambda instance’s file transfer.
I used the queuing features built into Hydra, but you don’t need to use Hydra to do something similar. See this sample gist showing the relevant Redis commands.
The segment-transfer service reads transfer jobs from a queue and dispatches them to lambda instances. Also, keep in mind that there are multiple segment-transfer instances (replicas) running.
Rather than attempt a massive lambda launch, each segment-transfer service reads 100 jobs and organizes them into a single batch. That results in 800 lambda invocations on an eight core machine with eight segment-transfer instances. That works out well given that by default there’s a maximum of 1000 concurrent lambda invocations. In practice I haven’t seen more than 500 simultaneous invocations due to the overhead of processing jobs into batches and launching lambda instances. Running twice the number of segment-transfer replicas per core would change that.
Occasionally, a lambda instance will fail to perform a transfer — shocking I know! When that happens the issue can be attributed to a network socket break or timeout. However, this isn’t an issue, because the entire system relies on messaging queuing so it’s just a matter of re-queuing the failed job.
Each job message contains a retry field which starts at three. Each failed attempt reduces the count by one and should the count reach zero then the job is marked as a complete failure. This has never happened. The most I’ve seen is a single retry resulting in success.
This all works exceedingly well. However, re-queuing messages causes them to get pushed to the end of their job queues. So what ends up happening at run-time is that jobs don’t complete until much later in the process. I noticed this behavior when I added a web-based visualizer to the aggregation service.
The classical solution is to use a priority-based queue. A priority queue would have allowed me to re-queue jobs with a higher priority to ensure those jobs didn’t have to move the end of the line. Redis support this using the zpop operation, which is something I may later revisit. Yet another potential solution is to maintain a separate job queue for failed operations and pull from that queue before pulling from the main job queue. It’s a code complexity trade-off.
Instead, I simply maintained an in-process re-queue list and pushed those jobs onto the next batch operation.
Visualizing the file transfer
To better see the actual behavior of the overall system I decided to build a web-based visualizer. The thought of rendering each of those virtual ants (I mean lambdas) as they transferred file segments — definitely warranted a side-hack.
I added a dashboard endpoint to the aggregator service to serve up a ReactJS single page app.
One challenge is that the dashboard could end up displaying the status of hundreds of video classes. Rendering that much data would be too slow - even using a virtual DOM.
I quickly realized that each file segment should not be a div but rather a single unicode text character.
Another important optimization used on both the front and backend is the use of bit fields. Since a class can have 5000 segments I simply create a 5000-bit field string and further reduce the required size by sending data from the backend in hex string format. The front-end converts the hex string to a binary string which is then iterated to render 0 as ◯ and a 1 as ◉.
Also once a class video is completely transferred, the segment status display is turned off to further reduce the impact of page rendering. The NPM bitset package was helpful in this regard!
The speed of transferring files using this approach is absolutely staggering. In one of my earlier tests, the system transferred four terabytes of data in two hours and twenty minutes — that’s roughly 523MB per second!
And that’s nowhere near the maximum potential. Using both a larger multi-core or cluster of multi-core machines and a higher concurrent limit of lambda invocations would yield even higher transfer speeds. Granted there are costs involved so don’t try this at home.
This post describes an approach, but one which exceeded our initial expectations. Admittedly, the task at hand is almost tailor-made for lambda — but also shows how microservices and server-less technologies can be used together. There’s no reason you can’t have both!
Thanks for reading! If you like what you read, hold the clap button below so that others may find this. You can also follow me on Twitter.