Append Newline to Amazon Kinesis Firehose JSON Formatted Records with Python and AWS Lambda

Amiri McCain
Analytics Vidhya
Published in
5 min readJan 28, 2021
Guanajuato Mexico by me

Simple right? Yet, this proved to be so elusive, at least for me and for longer than I would care to admit.

If you want to skip the scenic route and get straight to the solution, scroll down to the code below; better yet, ctrl+f “kinesis-fh-json-newline” otherwise, enjoy the stroll.

The Scenario. I am using Amazon DynamoDB (DDB) Streams. I have an AWS Lambda function that is taking those streams and pushing them to Kinesis Firehose. Firehose then pushes that data to S3. It works beautifully for getting my records into S3 as JSON strings as Firehose helps to control the flow of data.

Data pipeline without Firehose Transformation

So far, my (partial) data pipeline works great, with the exception of one issue. When each JSON formatted record is saved to file, they are saved serially — every record gets recorded on one line or row, one right after the other. When you open this file in Notepad, you see each record daisy chained together along the top of the file. I don’t want it stored like that as it is not intuitive, and will be difficult to process later.

I want the end of each record to have a carriage return so that they are stacked vertically, in a column so to speak, rather than stacked horizontally in one row.

JSON Records: This is how Kinesis Firehose saves each record to file.

I downloaded the file shown in the image above from my S3 bucket. The file illustrates how Kinesis Firehose saves each of my JSON records to file. I was surprised by two things, 1) Kinesis Firehose didn’t have a simple “checkbox” to add a newline for me and 2) I could not easily find a Python solution out there.

In Comes base64 Encoding/Decoding

It was not as simple as converting the DDBnewImage to a string and then adding + "\n" and then converting it back to a dictionary or JSON in my first Lambda. I would get either an error (when I used json_loads()) or nothing would happen at all (when I used ast.literal_eval()) and so I suspect the client.put_record() for Firehose was stripping out the carriage return. Firehose streams are base64 encoded and to get this to work correctly I had to import base64 in Python to decode the record to a string and then encode it back to utf-8. See the kinesis-fh-json-newline.py code for the details.

Here is how I did it:

  1. Create a new Lambda and use the kinesis-fh-json-newline.py code below, or use the Node.js version below.
  2. When you create your Kinesis Firehose stream, enable “Transform
    source records with AWS Lambda” (select “Enabled”). If you have already created your stream, you can still enable this feature by editing your existing stream.
  3. Select your newly created Lambda function for the transformation. This transformation lambda could do a bunch of other stuff as well, such as ETL. In my case, I only needed to add a new line at the end of each record so that when I open the file up in a text editor and view it, every entry is on a separate line.

Below is the tested solution code for both Python and Node.js that I used for that second Lambda:

Python solution to add a newline:

Node.js solution to add a newline:

Some good references that helped me piece the Python version together:

Adding a Newline in the First Lambda, Before Kinesis Firehose

I was able to get this working in the first Lambda (“Stream processing” in Data Pipeline image above) as well, rather than using the Kinesis Firehose transform source records feature. I did this by taking the newImage from DynamoDB Streams and doing, in this order: encode, decode, add new line (“\n”), encode, decode. There’s probably a much cleaner way. However, I chose to go with the transform source records feature using the second Lambda function as it seems cleaner to me at this time.

In any case, the single Lambda solution looks like this:

Transform source records with AWS Lambda

After creating your new Lambda in Python, go to your Kinesis Data Firehose delivery stream and edit your stream. Enable “Source record transformation” in the “Transform source records with AWS Lambda” section and select your newly created Lambda from the list and finally, save your changes. That should do it!

Firehose Transformation Lambda

Your new data pipeline flow should look like this:

Data pipeline with Firehose transformation for adding a newline.

Retest your Lambda function and voila! Each record should now have a carriage return and be in an expected, intuitive format.

JSON Records: With newline at the end of each record.

I’d like to hear your thoughts. Do you ever have the need to save records serially? Is there a simpler solution that I am missing? I’m sure there is! Comments, feedback, and suggestions are always welcome.

Future articles. In a future article or series of articles, I will detail completing this entire data pipeline so that we can get the data from DynamoDB into Amazon RDS such that it is ready for some data analytics using Amazon QuickSight.

If you enjoyed the article or the solution helped you, please clap your hands. You can clap up to 50 times, thank you.

--

--

Amiri McCain
Analytics Vidhya

Data engineering, cloud, and electronics tech enthusiast.