Handling Spaces in Column Names During Kinesis Firehose JSON-Parquet Data Transformation

Engineering@ZenOfAI
ZenOf.AI
Published in
7 min readJun 24, 2020

Parquet is an open source file format for Hadoop. Parquet stores nested data structures in a flat columnar format. Compared to a traditional approach where data is stored in a row-oriented approach, parquet is more efficient in terms of storage and performance. A common industry standard is to use parquet files in S3 to query with Athena. As parquet format is best suited and gives optimized performance compared to other data storage formats. You can read more about it here.

However Parquet doesn’t support spaces in column names, this will be an issue if you are using a Kinesis Firehose to stream log data. Typically logs are in JSON format. A common practise is to transform these JSON logs into parquet while writing to S3 so as to query log data in Athena. Json keys are mapped as column names, If your json keys have spaces in it, the transformation results in a failure. You can use the transformation lambda to handle those spaces (replace with underscores) in the keys.

We shall look at a setup, where cloudwatch logs are directly streamed to kinesis firehose (follow Example 3, point 12 to create subscription).

Here, the log format can be a simple json or the cloudwatch embedded metric format.

Creating the transformation lambda:

Create a lambda named ‘cloudwatch_logs_processor_python’ with the following code, set the runtime environment to Python 2.7, timeout at 5min 20sec. Python 2.7 because the following code is just an improved version of a Lambda blueprint.

Note: This lambda will handle only data sent by Cloudwatch logs to firehose only, if the source is different you might want to tweak the code a bit. Spaces in column names will be replaced with underscores. Give the comments in code a read to understand the functionality.

lambda_function.py

"""
For processing data sent to Firehose by Cloudwatch Logs subscription filters.
Cloudwatch Logs sends to Firehose records that look like this:{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "log_group_name",
"logStream": "log_stream_name",
"subscriptionFilters": [
"subscription_filter_name"
],
"logEvents": [
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208016,
"message": "log message 1"
},
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208017,
"message": "log message 2"
}
...
]
}
The data is additionally compressed with GZIP.The code below will:1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
processing error output. Such records do not contain any log events. You can modify the code to set the result to
Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
method.
6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
"""import boto3
import StringIO
import gzip
import base64
import json
def transform_metrics(cloudwatchmetrics):
newcloudwatchmetricslist = []
for index, cloudwatchmetric in enumerate(cloudwatchmetrics):
newcloudwatchmetric = {
"Namespace": cloudwatchmetric['Namespace'],
"Dimensions": [],
"Metrics": []
}
# print(cloudwatchmetric['Namespace'])
for key,value in cloudwatchmetric.items():

if key == 'Dimensions':
for eachlist in value:
newlist = []
for eachDimension in eachlist:
newlist.append(eachDimension.replace(' ', '_'))
newcloudwatchmetric['Dimensions'].append(newlist)

if key == 'Metrics':
newmetricslist = []
for eachMetric in value:
newmetric = {}
newmetric['Name'] = eachMetric['Name'].replace(' ','_')
newmetric['Unit'] = eachMetric['Unit']
newmetricslist.append(newmetric)
newcloudwatchmetric['Metrics'] = newmetricslist
newcloudwatchmetricslist.append(newcloudwatchmetric)
return newcloudwatchmetricslist
def transform_record(log_json):
log_transformed = {}
for k, v in log_json.items():
if isinstance(v, dict):
v = transform_record(v)
log_transformed[k.replace(' ', '_')] = v

if '_aws' in log_transformed:
transformedmetrics = transform_metrics(log_transformed['_aws']['CloudWatchMetrics'])
log_transformed['_aws']['CloudWatchMetrics'] = transformedmetrics

return log_transformed
def transformLogEvent(log_event):
"""Transform each log event.
The default implementation below just extracts the message and appends a newline to it.Args:
log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}
Returns:
str: The transformed log event.
"""
log_str = log_event['message'].encode('utf-8')
log_json = json.loads(log_str)
log_transformed = json.dumps(transform_record(log_json))
log_unicode = unicode(log_transformed, "utf-8")
return log_unicode + "\n"
def processRecords(records):
for r in records:
data = base64.b64decode(r['data'])
striodata = StringIO.StringIO(data)
with gzip.GzipFile(fileobj=striodata, mode='r') as f:
data = json.loads(f.read())
recId = r['recordId']
"""
CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
They do not contain actual data.
"""
if data['messageType'] == 'CONTROL_MESSAGE':
yield {
'result': 'Dropped',
'recordId': recId
}
elif data['messageType'] == 'DATA_MESSAGE':
data = ''.join([transformLogEvent(e) for e in data['logEvents']])
data = base64.b64encode(data)
yield {
'data': data,
'result': 'Ok',
'recordId': recId
}
else:
yield {
'result': 'ProcessingFailed',
'recordId': recId
}
def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedPutCount'] > 0:
for idx, res in enumerate(response['RequestResponses']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)if len(failedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
failedRecords = []
codes = []
errMsg = ''
# if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
# response will prevent this
response = None
try:
response = client.put_records(StreamName=streamName, Records=records)
except Exception as e:
failedRecords = records
errMsg = str(e)
# if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
if not failedRecords and response and response['FailedRecordCount'] > 0:
for idx, res in enumerate(response['Records']):
# (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
if 'ErrorCode' not in res or not res['ErrorCode']:
continue
codes.append(res['ErrorCode'])
failedRecords.append(records[idx])
errMsg = 'Individual error codes: ' + ','.join(codes)if len(failedRecords) > 0:
if attemptsMade + 1 < maxAttempts:
print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
else:
raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))
def createReingestionRecord(isSas, originalRecord):
if isSas:
return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
else:
return {'data': base64.b64decode(originalRecord['data'])}
def getReingestionRecord(isSas, reIngestionRecord):
if isSas:
return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
else:
return {'Data': reIngestionRecord['data']}
def handler(event, context):
isSas = 'sourceKinesisStreamArn' in event
streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
region = streamARN.split(':')[3]
streamName = streamARN.split('/')[1]
records = list(processRecords(event['records']))
projectedSize = 0
dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
putRecordBatches = []
recordsToReingest = []
totalRecordsToBeReingested = 0
for idx, rec in enumerate(records):
if rec['result'] != 'Ok':
continue
projectedSize += len(rec['data']) + len(rec['recordId'])
# 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
if projectedSize > 6000000:
totalRecordsToBeReingested += 1
recordsToReingest.append(
getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
)
records[idx]['result'] = 'Dropped'
del(records[idx]['data'])
# split out the record batches into multiple groups, 500 records at max per group
if len(recordsToReingest) == 500:
putRecordBatches.append(recordsToReingest)
recordsToReingest = []
if len(recordsToReingest) > 0:
# add the last batch
putRecordBatches.append(recordsToReingest)
# iterate and call putRecordBatch for each group
recordsReingestedSoFar = 0
if len(putRecordBatches) > 0:
client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
for recordBatch in putRecordBatches:
if isSas:
putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
else:
putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
recordsReingestedSoFar += len(recordBatch)
print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
else:
print('No records to be reingested')
return {"records": records}

Save it.

Enable transform source record with AWS Lambda:

On your Firehose stream, enable Transform source records with AWS lambda, and replace/use the above-created lambda function.

Note: Make sure that your Athena reference table schema has the same column names that will be there after replacing the spaces with underscores in column names of the JSON log. Or else the transformed parquet files will have null value columns.

After setting up, it would like this:

To check if the data is transformed properly, download the transformed parquet file and use this online parquet viewer. If the data is transformed without any loss, build a table in Athena using a crawler, load partitions, and query your parquet log data.

I hope it was helpful. Thank-you!

This story is authored by Koushik. He is a software engineer specializing in AWS Cloud Services.

Originally published at http://blog.zenof.ai.

--

--