How to build an Event Pipeline Part 2: Transforming records using Lambda Functions

Bahadir Cambel
Big Data WAL
Published in
6 min readFeb 20, 2018
“Modern geometric neon bridge architecture at night on walkway, High Trestle Trail Bridge” by Tony Webster on Unsplash

Note: Check out our latest Serverless Smart Radio series

In the Part I of How to build an Event Pipeline, we setup the basic pipeline to store all the incoming events to an S3. In this article we are going to look at how we can apply transformations on the fly with Amazon Lambda Functions.

Existing Architecture

Amazon Kinesis Firehose will be calling the Lambda Function in batches so that there will be less overhead and latency for each event.

Rather than changing the original firehose, I have created a separate one to apply transformations on it. As you may recall from the last article, there was a section in the wizard which asked for whether we would like to have a transformation applied to the ingested data, and we opted out for that. Now let’s go back to our wizard and opt-in for a transformation.

Once the stream is selected and a name is given the next step is the transformation step

Transformation Step

It’s time to implement a Lambda function which we don’t have it yet. For the sake of the simplicity, let’s use the one that Amazon already gives us; Personally I went ahead and try to write my own function in Clojure and failed miserably couple of times before figuring out what the schema should look like. So please stick with the example below and then start modifying depending on your needs/language/stack choices afterwards

Click the create new button and from the list select the Kinesis Firehose Process Record Streams as source

It’s crucial to obey the format of the returned records. The callback is called with { “records”: <json_results> } which created between the line 6–10.

For more information about the structure read https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Please also bear in mind that Kinesis expect the data to be base64 encoded again, so whatever you want to return, generate JSON first then base64 encode it. In the example above line 22: the record.data which is received from the Amazon Kinesis Firehose is already base64 encoded so that we will see the data as it was. One small modification to the example will be adding the newline character to the end of the data hence each of the JSON data records will be separated with a newline feed

The below part of the above screen will show you the code that will be used to create the function. Once the function is created then you will be able to change it however you want.

Once the function is created you will see the details page of the Lambda Function. Now let’s do a test run to see our Lambda Function in action

On the top right corner of the Lambda Function page, click the dropdown and click the configure test events which will bring you the next page that will contain a full blown Kinesis Firehose event. As you can see, the data is base64 encoded and there are metadata information related to the Kinesis Firehose stream plus the record detailed of the Kinesis Stream. Once the Kinesis Firehose reads multiple events from the Kinesis, the records key will contain multiple items which all of those will look like the sample below.

Data contains the text “Hello, this is a test 123.”

Once you create the test, click the test button on the screen and let the Lambda Function run!

Since this was the initial run, the run duration was 32ms, but the next run was in 0.43ms! So don’t count the first runs. What is also reported in AWS is that AWS is always bills to the next 100ms bucket.

As you may have noticed, I have made a small update to the data section and added the “Cg==” to the end of the string that is passed which as I mentioned will add the new line char to the end. So go ahead made a change to your function, and also update the timeout from 3 seconds to something more than 1 minute and hit the Save button afterwards. Also take a look at the settings below the screen to arrange the Memory / Concurrency / Role / Logging / DLQ etc..

Also don’t forget that all the application Logs are saved into the CloudWatch Logs. If you encounter any issues, head over to https://console.aws.amazon.com/cloudwatch/home and click the name of your Function that created from the list

nodejs-example is registered as a logging group

Let’s go back to our Transformation step in Kinesis Firehose. As you can see the Lambda Function will be available in the dropdown ( hit the refresh icon otherwise) and you can pick the version. Although I haven’t mention you about the versioning, all the consequent saves that we did, they were targeted and updated as $LATEST always. If you want to have multiple versions check the Actions/Publish new version in the Lambda Functions details page top right; but for now we won’t dive into that.

In the next step we will select where we would like to finalize our data

Select a S3 destination and prefix as you wish. While you’re developing and starting new, I highly suggest you to opt-in for the Source Record S3 backup option.

Configure Settings

Leave the other options as it is or lower the buffer interval to 60 seconds if you want to see the results quickly. Create the IAM role by clicking the button and then creating or attaching an existing role. If you have any issues check whether you have the correct permission set for the S3 folders to be written and then click the Create Delivery Stream button!

And you will see the finals start appearing in a couple of minutes in the format <bucket>/<folder>/<YYYY>/<mm>/<dd>/<HH>/<firehose-name>-<worker>-<date>-<time>-<uuid>

e.g

/nodejs2018/02/20/20/transforming-stream-1–2018–02–20–20–54–03-ed924b56–8d18–435a-a0d1-f72df7c4d12d

If you want to send a data head over to the last section of the Part 1 of the Article and check the samples

Notes:

I think you should follow me on Twitter: https://twitter.com/bahadircambel

--

--

Bahadir Cambel
Big Data WAL

(Ultra)Runner — Distributed Software/Data/ML engineer, Clojure & Python craftsman. Built a recsys