Ilias Bertsimas
Sainsbury’s Tech Engineering
10 min readJul 23, 2018

--

How we learned to Go in the cloud and not worry about the servers

My colleague Rowena Parsons has already mentioned how we do serverless at Sainsburys to help us cut costs and streamline services without worrying about managing their platform and infrastructure.

In this post I will present our journey in Food commercial to a serverless architecture to power our new generation of services along with all the challenges and the lessons learned.

The Service

In Food Commercial family at Sainsburys we are responsible for everything around food products. Our scope covers MDM, Supply Chain Planning, Supplier Management and New Business Development. We came across the need to develop a new service for one of our main important functions that would need to be a Tier 1 service in regards to High Availability, Resilience, Performance and Security along with a requirement to keep costs in check. For corporate reasons I can’t go into specifics of the service, but I can give a generic overview. The service requires data to be processed daily consumed from files, processed, stored and send over a REST API based on the date. The data volume can vary from day to day having a big range in that variation. The data becomes available once a day but on a wide time-window and the processing needs to be done within a set amount of time from the moment we get the data. As we are cloud first and specifically AWS first that is where the service would need to run.

Initial Design

Service design was approached in a very classic way in the beginning. The design ended up having EC2 instances in a VPC running RDS as database of choice. The service was expected to be written either in Java or Python. There were issues that were raised

  • How the service would handle High Availability and Resiliency
  • How would we size the instances with that wide ranging volume
  • Complexity of the solution of what is supposed to be a simple service

The above points raised questions:

How do we recover mid file processing across instances ?

We get a single file that we would have to start processing and autoscaling would be difficult to calculate so how do we scale for the load that is unknown ?

Can we scale on time and efficiently or we need less and bigger size instances running 24/7 ?

All failure scenarios would have to be handled and coordinated across instances and how would that raise complexity ?

Suffice to say the answers or lack of on some of them indicated this wasn’t the best course of action for our service. So back to the drawing board we went.

New Design

A new design was needed that could meet our requirements and be more cloud native. We took a hard look at the services in our disposal on AWS. It became very obvious to us that we needed to reduce complexity by handing some of it back to our trusted AWS partner and so we did.

The new design dropped EC2 instances in favour of using AWS Lambda a serveless fully managed by AWS run your code and worry about nothing else solution or so it would seem.

Our Delta to their Lambda

We chose AWS Lambda as it answered a lot of the questions the previous design couldn’t. AWS Lambdas have proper error handling even if they fail mid-execution for no apparent reason the invoking actor will be able to catch that and act accordingly. We don’t have to right-size instances at most we choose the amount of memory for the lambda which is correlated internally to the compute resources, then you pay exactly the amount of GB seconds you used. You can also launch theoretically as many lambdas as you need at a whim (AWS account region execution upper limit exists). All this reduces complexity on our end and has out trusted AWS partner take care of the details while at the same time we only pay what we really use which from day to day can vary from a 30 sec execution to the next a full hour or more.

As with everything there are tradeoffs with AWS Lambda too. A lambda can only run up to 5 mins, beyond that your code execution ends abruptly. So how do you handle that?

There are 2 fronts where you can handle the execution hard limit of lambdas. First you make sure in your code that you don’t exceed it inside each lambda, this requires to use the context object passed to you with the remaining time to timeout. Then you need to make sure you can wrap up your processing in a clean way before that timeout is reached.

Let’s take a step back and talk about our design to achieve all that. To fulfil our requirements of covering volumes that can be processed in a couple of seconds to ones that may need an hour or more we needed a way to handle all that. We came up with the supervisor — worker model along with pipelining of processing. How does that work ? Let’s take a look at the following diagram.

We can see that what happens is we have a supervisor lambda that reads the file metadata from the S3 bucket and decides how to slice up the file into chunks with the byte ranges representing the work chunks. The supervisor also decides if it can process the whole file in one cycle based on an algorithm that takes into account multiple parameters. If it can’t it will decide the cut off for this cycle and pipeline the rest of the work chunks to a version of itself after this cycle has finished processing and by doing that we start the time out clock again and we repeat the process until the whole file has been processed.

Looking into what the supervisor does in a cycle to get the file processed, based on an algorithm decides how many workers to invoke splitting the work chunks across them. Then the workers either manage to complete all the work chunks before the time out limit otherwise they abort the ones not done and return them back to the supervisor for completion in the next cycles of the pipeline.

In our case where we process files cutting the file in multiple chunks allows us to have small units to process that we can easily halt processing cleanly. The second front is the ability to cut the file and assign work chunks along with handling the remaining ones if a worker ended without completing all their work chunks. To be able to achieve that functionality we need concurrent execution of processing of our work chunks.

Let’s Go

At Sainsburys we have used Go a lot across the estate, when other organizations were doing microservices in Scala we were doing them in Go. Also, I switched development of all my DevOps tooling in Go a couple years back as I was surprised by its performance, how easy concurrency is and how integrated into the fabric of the language it is along with the amazing cross platform compilation which allows you to share your tools by compiling for most major architectures and platforms and you know that it will just work without having to worry about dependencies and versions.

Concurrency being a big requirement of our solution and Go having just been made an officially supported language in AWS Lambda it was a match made in heaven.

In our supervisor we use Go routines with waitgroups and channels extensively to launch the workers emulating asynchronous invoke while doing Request-Response invoke to be able to keep track of each worker and get back the necessary information as in work chunks not completed or failed worker lambdas and act accordingly.

package mainimport (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"sync"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
servicelambda "github.com/aws/aws-sdk-go/service/lambda"
_ "github.com/lib/pq" // required to load the drivers
)
const maxBatchSize = 5000 // maxBatchSize we can only process this many records per lambda
const maxWorkers = 3 // maxWorkers we want to limit concurrent workers
var workerARN = "arn_to_a_lambda"
var db *sql.DB
var lambdaCaller *servicelambda.Lambda
func main() {
db, _ = sql.Open("postgres", "user=user password=password host=localhost dbname=db1")
lambdaCaller = servicelambda.New(session.New())
lambda.Start(processRecords)
}
// processRecords
func processRecords(ctx context.Context) error {
var wg = &sync.WaitGroup{}
processIDs := make([]int, 0, maxBatchSize*maxWorkers)
rows, _ := db.QueryContext(ctx, "SELECT id FROM table WHERE processed = 'N' LIMIT $1", maxBatchSize*maxWorkers)
for rows.Next() {
var id int
if err := rows.Scan(&id); err != nil {
continue
}
processIDs = append(processIDs, id)
}
wChan := make(chan int, maxWorkers)
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
start := i * maxBatchSize
payload := processIDs[start : start+maxBatchSize]jsonPayload, _ := json.Marshal(payload)invReq := &servicelambda.InvokeInput{
FunctionName: aws.String(workerARN),
InvocationType: aws.String(servicelambda.InvocationTypeRequestResponse),
Payload: jsonPayload,
}
go invokeWorker(invReq, wg, wChan)
}
wg.Wait()
close(wChan)retries := 0
for fw := range wChan {
retries += fw
}
if retries > 0 {
return fmt.Errorf("some batches need retrying") // returning error informs the event calling this lambda it needs to retry
}
fmt.Println("batches complete")
return nil
}
func invokeWorker(req *servicelambda.InvokeInput, wg *sync.WaitGroup, c chan<- int) {
defer wg.Done()
res, err := lambdaCaller.Invoke(req)
if err != nil {
log.Println("ERROR invoking lambda : " + err.Error())
c <- 1
}
if res != nil {
if res.FunctionError != nil && *res.FunctionError == "Handled" {
log.Print("ERROR handled error during worker lambda run, invoking retry")
c <- 1
}
if res.FunctionError != nil && *res.FunctionError == "UnHandled" {
log.Print("FATAL unhandled error during worker lambda run, invoking retry")
c <- 1
}
}
}

Our workers Lambdas employ similar use of Go routines with waitgroups and channels to driver the reading from S3, parsing, transformation and database inserting of data in a massively concurrent fashion. Most importantly they are used in workers along with context that provides the execution deadline. When this is reached any Go routines still not finished will clean up by aborting any non-committed transactions to the database to get back to a consistent state and return their work chunk information back to the main routine which in turn returns all of them to the supervisor’s Lambda response to the original invokation request.

package mainimport (
"context"
"database/sql"
"fmt"
"time"
"github.com/aws/aws-lambda-go/lambda"
_ "github.com/lib/pq" //required for the drivers
)
var db *sql.DBfunc main() {
lambda.Start(handler)
}
func handler(ctx context.Context) error {
rowsFound, rowsProcessed := 0, 0
defer func(found, processed *int) {
fmt.Printf("Found %d processed %d\n", *found, *processed)
}(&rowsFound, &rowsProcessed)
deadline, _ := ctx.Deadline()// create a new context.Timeout using time until deadline minus 10 seconds to give us clean up time
var cancel func()
ctx, cancel = context.WithTimeout(ctx, time.Until(deadline.Add(-10*time.Second)))
defer cancel() // cancel the context when we leave this function call
queue := make(chan int, 100000)
done := make(chan error)
go getRecords(ctx, queue, &rowsFound)
go processRecords(ctx, queue, done, &rowsProcessed)
for {
select {
case <-ctx.Done():
fmt.Println("lambda about to time out, cleaning up, waiting for worker to finish")
<-done
return fmt.Errorf("lambda timed out")
case err := <-done:
if err != nil {
fmt.Println("worker completed with : ", err.Error())
return err
}
fmt.Println("worker completed")
return nil
default: // don't block
}
}
return nil
}
func getRecords(ctx context.Context, queue chan<- int, count *int) {
fmt.Println("starting getting records")
rows, _ := db.QueryContext(ctx, "SELECT id FROM table WHERE processed_ind='N' LIMIT 100000")
defer rows.Close()
for rows.Next() {
var id int
_ = rows.Scan(&id)
*count += 1
queue <- id
}
close(queue)
fmt.Println("completed getting records")
}
func processRecords(ctx context.Context, c <-chan int, done chan<- error, count *int) {
tx, _ := db.BeginTx(ctx, nil)
stmt, _ := tx.PrepareContext(ctx, "UPDATE table SET processed_ind='Y' WHERE id=$1")
defer stmt.Close()
for {
select {
case <-ctx.Done():
fmt.Println("timing out, killing worker and rolling back transaction")
*count = 0
tx.Rollback()
done <- fmt.Errorf("timed out")
return
case id, open := <-c:
if !open {
fmt.Println("worker channel closed, stopping worker and committing changes")
tx.Commit()
done <- nil
return
}
r, err := stmt.Exec(id)
if err != nil {
continue
}
c, _ := r.RowsAffected()
*count += int(c)
}
}
}

The complete solution

Up to now we focused on a single stage of the processing to examine how we handled the challenges of running varied sized processing of data in AWS Lambda with 300 seconds max execution time and how we put to good use the amazingly well-done concurrency available in Go to fulfil our requirements. However, this isn’t the complete solution in reality we have 3 distinct stages of processing that make up our service.

As we can see in the diagram, the whole process gets triggered by the file in S3 which launches the first stage supervisor. The first stage handles the parsing, transformation and filtering and inserting of the data in the RDS database.

Following, the second stage is responsible for doing some processing based on the newly inserted data and storing the results into a second RDS database. It follows the same supervisor-workers model as the first stage and so does the third stage.

On the third stage we pick the data from the second RDS database that are to be sent to a REST API, we form the request payloads, do our error handling and retrying until all is done.

Lessons learned

Although being stateless is great, sometimes it is useful to keep state between Lambda invokations, as for example the number of times we retried when our processing involves a third party component out of our control that maybe is not performing as expected or is not available at all.

Make sure to read the very extensive AWS Lambda documentation and understand how things work for Go cause otherwise you might miss behaviour like global variable state being kept across executions for certain periods of time.

Always keep in mind the unique properties of AWS Lambda invokation types like Async Event having low size payload restrictions and default 2 retries on failure of the same exact request and handle it in your code or switch to the ResponseRequest where it makes sense but in some cases like S3 Events you don’t have a choice due to the nature of it.

If you are doing concurrent processing in Lambda make sure to allocate enough memory even if you don’t need it as it is the resource connected to the amount of processing threads you get. Of course, it is best to test it out with your code to see where you start seeing diminishing returns instead of using the max available outright.

Many thanks to Thomas Smith for the code excerpts.

--

--