Kinesis, Lambda and the Serverless Framework
This article describes an experiment I tried with AWS Lambda, Kinesis Streams and the Serverless Framework. I wanted to see how easily I could run code to solve one problem on hundreds of servers in parallel using the event-driven serverless services of AWS. I wanted to use the Serverless Framework to create something simple that could be deployed, run and stripped down again really quickly.
I had previously been playing with ways to solve the Queens Completion puzzle, and had come up with this:
I don’t think the algorithm I came up with is anything special, but I was still interested to see how well it performs if I run it on cheap AWS services, and how easy it is to do something like this.
The code has been written to:
- take a record from a queue
- process it, and as a result…
- create several more records to be processed or
- output a solution to the problem
If we put those records onto an external queue we can have multiple workers (Lambda functions) processing those records and spread the workload across many CPUs. Will it be able to find decent time all solutions to a 28 x 28 chess board, which so far remains unsolved?
Spoilers
No. I had better results just running it on my own machine, and I think that’s down to the inefficiencies of my code converting stuff data going to/from the queue. But it’s still pretty cool how easy it was to try.
Serverless Framework
So if you don’t know what the Serverless Framework is, among other things, it can simplify the process of deploying code as Lambda functions on AWS (or the equivalent on Azure/GAE/etc.) and setting up the triggers for those functions to be called. See my other article for an explanation on how to deploy a REST web-service with it:
Kinesis Streams
Kinesis Streams are basically queues, but one thing they offer over SQS (Amazon’s simple queue service) is that they can be split into several “shards”. Messages put onto the front of the queue can be spread evenly across these shards. You can then attach consumers to these shards/sub-queues. So you can have one ‘stream’ but choose to give it up to 500 shards and spread the workload in parallel across 500 consumers, reading at a rate of 5 reads per second from each shard, with a maximum of read output of 2MB/sec. You get charged for the time that the shards are deployed at $0.0179 per shard per hour.
You also, at this stage, cannot hook up a Lambda function to an SQS queue, so… Kinesis wins.
I’ll be storing any solutions that my system comes up with in a DynamoDB table, so we end up with something like this:
Kinesis + Lambda in Serverless.yml
Here’s how you can create a Kinesis stream and attach a Lambda function onto the end of it with Serverless. I’ll explain my serverless.yml file. (You can find the whole thing here)
service: queens-completion-lambda-kinesis provider:
name: aws
runtime: nodejs6.10
region: eu-west-2
memorySize: 128
timeout: 300
This tells Serverless that we want to use AWS, Node v6.10 and deploy to EU-West-2 region, we want Lambda instances with 128MB of RAM and they should timeout after 300 seconds (maximum time allowed). The memorySize also determines what CPU we are going to get. AWS Lambda allocates CPU power proportional to the memory (see Configuring Lambda Functions), so although we might not need much memory, we might get better results later if we bump this up.
Privileges
iamRoleStatements: # Grant privilege read from Kinesis work stream
- Effect: Allow
Action:
- kinesis:GetRecords
- kinesis:GetShardIterator
- kinesis:DescribeStream
- kinesis:ListStreams
- kinesis:PutRecord
- kinesis:PutRecords
Resource:
Fn::GetAtt:
- workStream
- Arn
We are going to have a Kinesis stream called “workStream”. To allow our Lambda functions read/write to that stream using the AWS SDK, we need to know the ARN (Amazon Resource Name) for that table. We won’t know that until the table is created, and it is created in the Resources section of this serverless config, so we use the GetAtt (get attribute) function to get it.
# Grant privilege to write to results table
- Effect: Allow
Action:
- dynamodb:PutItem
Resource:
Fn::GetAtt:
- resultsTable
- Arn
For testing this, I also wanted to create a “results” DynamoDB table to catch the solutions that it comes up with. This config gives it the privilege to write to the results table that I’ll create later.
Lambda function
functions:
queensHandler:
handler: lambdaHandler.process
This is my Lambda function. All I’ve done here is declare a new lambda called “queensHandler” and set the process() function exported by lambdaHandler.js as the code for that. Unfortunately, I can’t hook it up to the Kinesis stream here using the Serverless framework, so I have to do it with some direct CloudFormation config below.
Kinesis stream
resources:
Resources:
workStream:
Type: AWS::Kinesis::Stream
Properties:
Name: queensWorkStream
ShardCount: 200
In the resources section of a serverless.yml file we can include some CloudFormation stuff to create our Kinesis stream , the results table, and the mapping from the stream to our Lambda function. Firstly we have the stream. It’s pretty simple. That shard count will determine how many Lambdas we’ll have listening for work. It doesn’t scale automatically, so you’d have to do that yourself if you want it to scale according to the workload or preemptively for busy periods.
DynamoDB table
resultsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: queensResultsTable
AttributeDefinitions:
- AttributeName: key
AttributeType: S
KeySchema:
- AttributeName: key
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 100
Here I’ve created the results table. We won’t be reading from it, so I’ve set the read capacity to the minimum of 1, but we will have many Lambdas writing to it at the same time.
Kinesis -> Lambda mapping
Event:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 50
EventSourceArn:
Fn::GetAtt:
- workStream
- Arn
FunctionName:
Fn::GetAtt:
- QueensHandlerLambdaFunction
- Arn
StartingPosition: TRIM_HORIZON
Here’s where I actually attach our new Lambda function to the stream. The BatchSize tells it how many items to collect up and pass to the Lambda function. Although if you only put 10 items on the shard it obviously won’t wait forever until it gets 20. If you want to know more about that mechanism there’s a great article by Alessandro Bologna https://itnext.io/creating-a-blueprint-for-microservices-and-event-sourcing-on-aws-291d4d5a5817
Note that I have to specify the Lambda name as: QueensSolverLambdaFunction in the `GetAtt` (get attribute) function call, despite the fact I called it ‘queensSolver’ above. ¯\_(ツ)_/¯
Code
So that’s the deployment. As for the code, I have my function doing the work that looks like this: processSet(set, onNewSet, onResult)
It takes in some data from the queue, and takes to callbacks for it to provide another set/record to be put onto the queue/stream or provide a result it has found that we can store in the results table. It knows nothing of the queue implementation. Then we have our lambdaHandler.js
file which accepts data from the queue, and converts it to be passed on:
function process(event, context, callback) {
event.Records.forEach(record => {
const set = convert(record)
// Pass data and callback functions to processor
processSet(set, addNewSet, publishResult)
})
}module.exports.process = process
It also contains the two functions that will take output from the function and either put that output onto the queue or store it in the results table. Here’s the code to put a new record onto our Kinesis stream:
function addNewSet(set) {
const key = set.key.toString(16)
const params = {
Data: JSON.stringify({
level: set.level,
key: key,
connections: set.connections.toString(16),
}),
PartitionKey: key,
StreamName: config.KINESIS_WORK_STREAM,
}
// Write record to stream
kinesis.putRecord(params, err => {
if(err) console.error(err, err.stack)
})
}
AWS will base64 encode the Data for you. The Partition Key is the value which will be put through a hash function by AWS to determine which shard the data will end up on, so if you want one shard processor to get all records of a particular type-code (for cache/performance purposes) then you could use that type-code as the partition key. Otherwise, use something unique to spread records across all shards as evenly as possible.
Here’s our function to convert records from the queue back into objects for our process function:
function convert(record) {
// Convert from base64 to Buffer to JSON string
const jsonStr =
Buffer.from(record.kinesis.data,'base64').toString()
// ...to an object
const data = JSON.parse(jsonStr)
// ...to a set we can process.
// Each set has a key (a map of nodes), a level (number of nodes
// in set) and 'connections' (map of mutually connected nodes).
// Convert from hex string to big number
const set = {
level: data.level,
key: new BN(data.key, 16),
connections: new BN(data.connections, 16)
}
return set
}
All the code for this can be found here:
Results
Right, let’s run it…
$ serverless deploy
I created a script called putInitialRecord.js to put a record with a key of 0 onto my stream. This prompts my processSet function to put the initial records for processing onto the queue; In the context of this problem, one record for every square on the chess board.
$ node putInitialRecord
After you are done, clear down everything to avoid any unnecessary costs:
$ serverless remove
Board size: 8, Shards: 50, Batch size: 50, Memory: 128MB
92 solutions found in: 231 seconds
Considering this takes 0.4 seconds on my local machine, that’s pretty disappointing, but hey! I just ran some code on 100 servers at once and it cost me about two cents ($0.0179 per shard per hour). A few seconds after kicking it off I can see there are 50 servers by the fact there are 50 log streams in Cloudwatch -> Logs, each one listing the start/end time and duration for each Lambda execution.
I’ll try varying those parameters:
Board size: 8, Shards: 50, Batch size: 100, Memory: 128MB
92 solutions found in: 223 seconds
Board size: 8, Shards: 50, Batch size: 50, Memory: 256MB
92 solutions found in: 71 seconds
Board size: 8, Shards: 100, Batch size: 50, Memory: 128MB
92 solutions found in: 70 seconds
Nice to see that twice the memory (and therefore twice the CPU power) processes it in the same time as twice as many workers. Even so, it’s still not going to beat 0.4 seconds. But maybe if the problem was bigger, it would better prove the benefit of doing all this in parallel.
One more test
Board size: 10, Shards: 200, Batch size: 50, Memory: 256MB
724 solutions found in: 19 minutes 54 seconds
Locally, this takes 10 seconds. OK, I’m definitely not going to try anything close to a 28 x 28 chessboard. But still… I’m impressed by how easy it is to spin up something like this.
Summary
So I didn’t get any great results for my particular experiment, but it is amazing how easy, cheap and quick it is to spin up that many servers for whatever you want to do. I was using the base-level Lambda instances, but you can get a lot more power if you are willing to pay for it.
Kinesis Streams are a great, simple service for this sort of horizontal scaling. My only costs were just the $0.0179 per hour per shard.
100 of these for half an hour = $0.90, + $0.01 per million queue put-requests
Everything else (Lambda invocations, DynamoDB read/write units) all came within the free tier. Although make sure you bring the system down (serverless remove
) after you are done playing… those DynamoDB units and Kinesis shards do not scale down when you’re not using them.
100 write capacity/units at $0.0008 per write capacity/unit per hour =
$58 a month.
The Serverless Framework simplifies the power of CloudFormation; defining the architecture for your application in one ‘stack’ to be deployed when you choose, and abstracts it to make it easier to deploy where you choose (i.e. Azure or other providers). It gets a lot of its power from the complete control that the AWS SDK can give you; if you want to write code to auto-scale your Kinesis Shards or monitor your billing costs, you can. Let me know if you do anything cool with it.