End-to-end Pipeline Using AWS Services Part 1 — Data Ingestion with IoT Core and DynamoDB

Yunus Emrah Uluçay
4 min readAug 24, 2022

This series contain 3 parts. In this series our scenario is to get data from an IoT device for every 3 minutes send it to DynamoDB. After sending data to DynamoDB we create a trigger using Lambda service. Lambda service will trigger the Glue. Glue will get that data, transform it and save to S3. After Glue finish its job Lambda will remove the data from DynamoDB.

In this part we create data as if we have IoT device. After that we send that data using IoT Core to DynamoDB.

End-to-end Pipeline Part 1 Workflow

1-DynamoDB (Create Table)

Go to DynamoDB console then choose Create table.

In Create table: Table name = weather_table, Partition key = sample_time, Sort key = device_id for both of two select Number, keep everything default and Create table.

Creation of DynamoDB Table

Message sample that we will send to.

{   
"temperature": 28,
"humidity": 80,
"barometer": 1013,
"wind": {
"velocity": 22,
"bearing": 255
}
}

2-IoT Core Arrangement

Go to IoT Core console. Left pane under the Message Routing click Rules and

Create rule -> Rule name = weather_data_rule. Click next.

For SQL statement :

SELECT temperature, humidity, barometer, wind.velocity as wind_velocity, wind.bearing as wind_bearing, FROM ‘device/+/data’

For database entry we don’t use nested structure and flatten them. Click next.

Action 1 = DynamoDB, Table name = weather_table, Partition key = sample_time, Partition key type = NUMBER, Partition key value = ${timestamp()}, Sort key = device_id, Sort key type = NUMBER, Sort key value = ${cast(topic(2) AS DECIMAL)}, Write message data to this column = device_data, and you can create a new IAM role for this and click create.

To test (Optional) : In IoT Core console left pane Test -> MQTT test client -> Subscribe to a topic -> Topic filter = device/+/data and Subscribe. After subscription click Publish to a topic -> Topic name = device/2/data, Message payload =

{ “temperature”: 28, “humidity”: 80, “barometer”: 1013, “wind”: { “velocity”: 22, “bearing”: 255 }}

and Publish the message. Go DynamoDB and check.

Go to IoT Core console. Left pane -> All devices -> Things. Create thing. Create a single thing and click next. Thing name = weather_data_thing keep others default and click next. Choose Auto-generate a new certificate (recommended) and click next. If you don’t have Policy click create policy -> Policy name = admin, Policy action = “*”, Policy resource = “*” and create (This policy for testing). After this choose policy and create thing.

Download all certificates. Move them to a certificates file and rename AmazonRootCA1.pem as root.pem.

Left pane in IoT Core go to Settings and copy Endpoint. We will use this Endpoint in our Python script file.

Create python file to send data DynamoDB via IoT Core.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import time as t
import numpy as np
import json
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
# Define ENDPOINT, CLIENT_ID, PATH_TO_CERTIFICATE, PATH_TO_PRIVATE_KEY, PATH_TO_AMAZON_ROOT_CA_1, MESSAGE, TOPIC, and RANGE
ENDPOINT = “endpoint url”
CLIENT_ID = “testDevice”
PATH_TO_CERTIFICATE = “certificates/...24-certificate.pem.crt”
PATH_TO_PRIVATE_KEY = “certificates/...-private.pem.key”
PATH_TO_AMAZON_ROOT_CA_1 = “certificates/root.pem”
TOPIC = “device/1/data”
#RANGE = 20
myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID)
myAWSIoTMQTTClient.configureEndpoint(ENDPOINT, 8883)
myAWSIoTMQTTClient.configureCredentials(PATH_TO_AMAZON_ROOT_CA_1, PATH_TO_PRIVATE_KEY, PATH_TO_CERTIFICATE)
myAWSIoTMQTTClient.connect()
#for i in range(3):
print(‘Begin Publish’)
MESSAGE = {“temperature”: np.random.randint(25,30),
“humidity”: np.random.randint(75,85),
“barometer”: np.random.randint(1010,1040),
“wind”: {“velocity”: np.random.randint(20,26), “bearing”: np.random.randint(250,270)}}
myAWSIoTMQTTClient.publish(TOPIC, json.dumps(MESSAGE), 1)
print(“Published: ‘“ + json.dumps(MESSAGE) + “‘ to the topic: “ + “‘test/testing’”)
#t.sleep(180)
print(‘Publish End’)
myAWSIoTMQTTClient.disconnect()

I named this script file as device1.py and this file at same directory with certificates folder. This is for test purpose in other parts we will make a little bit change on this.

|
--- device1.py
--- certificates/

Fill ENDPOINT, PATH_TO_CERTIFICATE and PATH_TO_PRIVATE. Run script file using CLI writing = “python3 device1.py”.

Above script is for testing this part. The code below is to final.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import time as t
import numpy as np
import json
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
# Define ENDPOINT, CLIENT_ID, PATH_TO_CERTIFICATE, PATH_TO_PRIVATE_KEY, PATH_TO_AMAZON_ROOT_CA_1, MESSAGE, TOPIC, and RANGE
ENDPOINT = "endpoint url"
CLIENT_ID = "testDevice"
PATH_TO_CERTIFICATE = "certificates/...853d-certificate.pem.crt"
PATH_TO_PRIVATE_KEY = "certificates/...853d-private.pem.key"
PATH_TO_AMAZON_ROOT_CA_1 = "certificates/root.pem"
TOPIC = "device/1/data"
#RANGE = 20
myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID)
myAWSIoTMQTTClient.configureEndpoint(ENDPOINT, 8883)
myAWSIoTMQTTClient.configureCredentials(PATH_TO_AMAZON_ROOT_CA_1, PATH_TO_PRIVATE_KEY, PATH_TO_CERTIFICATE)
myAWSIoTMQTTClient.connect()
for i in range(3):
print('Begin Publish')
MESSAGE = {"temperature": np.random.randint(25,30),
"humidity": np.random.randint(75,85),
"barometer": np.random.randint(1010,1040),
"wind": {"velocity": np.random.randint(20,26), "bearing": np.random.randint(250,270)}}
myAWSIoTMQTTClient.publish(TOPIC, json.dumps(MESSAGE), 1)
print("Published: '" + json.dumps(MESSAGE) + "' to the topic: " + "'test/testing'")
t.sleep(180)
print('Publish End')
myAWSIoTMQTTClient.disconnect()

Final:

Running script and output
Result in DynamoDB table.

Thank you for reading. If you have any advice or question please don’t hesitate to comment. And if you found it useful please don’t hesitate to like it :)

References:

https://aws.amazon.com/premiumsupport/knowledge-center/iot-core-publish-mqtt-messages-python/

https://docs.aws.amazon.com/iot/latest/developerguide/iot-ddb-rule.html

--

--

Yunus Emrah Uluçay

Hello I am Yunus. I write articles about various sides of software.