AWS Greengrass on Raspberry Pi: Adding Time-Series Databases

Gabe Ibarra
TensorIoT
Published in
7 min readFeb 13, 2018

This post builds off of a setup described in our previous post: “AWS Greengrass on Raspberry Pi: Creating Core and Node Devices”. We’ll demonstrate the features and implementation of time-series databases Graphite and InfluxDB on Raspberry Pi. The Lambda script on the Greengrass core will be updated to save data to these DBs.

Intro

IoT systems generate a lot of telemetry (tele = remote, metron = measure) data, which by its nature is time-series.

The IoT Edge will need a storage solution optimized for the type of data/computation happening in IoT applications:

  • Resiliency - In a remote network of IoT devices it is easy to imagine the network occasionally losing connectivity to the cloud.
  • Networking Efficiency - The intent in a Greengrass setup is to save on networking costs communicating with the cloud by computing on the edge.

Time-series databases are optimized to index data by time or time ranges. An IoT application like a sensor, or the general IT monitoring metrics associated with managing our devices all generate time-series data that works well in this architecture.

Graphite

Graphite has a couple parts:
1. Carbon — listens for data events and writes them + aggregates them.
It’s constrained by Python (issues with threads) so implementations need to evolve into using Carbon relays and Carbon caches. Possible complete alternatives are graphite-ng, Riemann
2. Whisper db files — InfluxDB is a valid replacement because it has compatible protocol with Carbon. Also, Whisper project is going to be replaced by Ceres someday. Another alternative is using a log reader.
3. Graphite-web — provides web graphs using Django + wsgi server (apache or nginx). It might not be appropriate to run thisweb server on a pure Greengrass core device, unless it helps with troubleshooting. Maybe the core could send the data to a more appropriate local machine that could host the internal web server for the local IT.

Install

Note: Carbon only works with Python 2 at the moment.
ssh into Pi.
Check pip version: pip --version
If pip version is > 7.0.0 you need this--no-binary=:all: flag:

sudo -s 
apt-get install libssl-dev libffi-dev
pip install — no-binary=:all: https://github.com/graphite-project/whisper/tarball/masterpip install — no-binary=:all: https://github.com/graphite-project/carbon/tarball/master

I was installing on default python 2.7 with an older pip version so mine was:

pip install https://github.com/graphite-project/whisper/tarball/masterpip install https://github.com/graphite-project/carbon/tarball/master

Configure Carbon:

pushd /opt/graphite/conf
cp carbon.conf.example carbon.conf
cp storage-schemas.conf.example storage-schemas.conf

Add a new route to storage-schemas.conf:

# pi1 routes
[pi1_topics_30s_for_10days]
pattern = ^pi1\.
retentions = 30s:10d

That’s a 30 second metric to match our 30 second publishes.

Start Carbon

python /opt/graphite/bin/carbon-cache.py start

[Optional]: I got a Python traceback about Twisted / SSL. I needed to update some of my python tools:
Exit sudo -s then sudo -i
pip install upgrade pip
Go back to sudo -s
pip install --upgrade pyopenssl

Check the logs for additional issues:
/opt/graphite/storage/log/carbon-cache/carbon-cache-a/console.log
You should see something about CarbonReceiverFactory starting on port-2004
We didn’t set up storage-aggregation.conf yet so the warnings are fine there.

Send test data:

First try running a quick test script locally to see if it worked.
I took a quick example from here:
https://github.com/graphite-project/carbon/blob/master/examples/example-pickle-client.py

Leave sudo, and in home directory create testcarbon.py

import pickle
import random
import socket
import struct
import time
CARBON_SERVER = '127.0.0.1'
CARBON_PICKLE_PORT = 2004
message = 'pi1.temptopic'
sock = socket.socket()
sock.connect((CARBON_SERVER, CARBON_PICKLE_PORT))
for i in range(6):
now = int(time.time())
data = random.randint(0, 100)
print("Sending %s of 6: data=%s" % (i + 1, data))
tuples = [(message, (now, str(data)))]
package = pickle.dumps(tuples, 1)
header = struct.pack('!L', len(package))
sock.sendall(header + package)
time.sleep(30) if i < 5 else print "finished"
sock.shutdown(socket.SHUT_RDWR)
sock.close()

After you run the script, check the running log: /opt/graphite/storage/log/carbon-cache/carbon-cache-a/listener.log
and see a file got created for this new topic at /opt/graphite/storage/whisper/pi1/temptopic.wsp

For some reason the whisper bin files didn’t install for me, so grab the relevant ones from github:

sudo curl -O https://raw.githubusercontent.com/graphite-project/whisper/master/bin/whisper-fetch.py

That repo is a good reference in case you also need some other utililty scripts like whisper-resize.py in the future).
Finally run the utility to do a quick query of the last 6 minutes of items. You should see 12 rows since we set our storage by 30 second intervals.

python /opt/graphite/bin/whisper-fetch.py — from=$(date +%s -d “-6 min”) /opt/graphite/storage/whisper/pi1/temptopic.wsp

Output: (Timestamp, data)

1510361940 None
1510362000 None
1510362060 3.0
1510362120 None
1510362180 None

Use the Greengrass Core’s Lambda to handle Carbon

Note: Carbon opened port 2003 / 2004 on my pi to the network — You might want to firewall them and keep our communications to the secure Greengrass events.

Update your local dev machine version of the lambda corelambda.py w/ some code similar to the script above:

import greengrasssdk
import json
import pickle
import socket
import struct
import time
CARBON_SERVER = '127.0.0.1'
CARBON_PICKLE_PORT = 2004
def handler(event, context):
client = greengrasssdk.client('iot-data')
client.publish(
topic='group1_core/topic_1',
qos=0,
payload=json.dumps(event))

tuples = [('pi1.topic_1', (
int(time.time()),
event['test_data']))]
package = pickle.dumps(tuples, 1)
header = struct.pack('!L', len(package))

sock = socket.socket()
sock.connect((CARBON_SERVER, CARBON_PICKLE_PORT))
sock.sendall(header + package)
sock.shutdown(socket.SHUT_RDWR)
sock.close()

(It’s likely we’d want to remove the client.publish of this granular data, and create a new process that publishes aggregated data later. Or add something like if start of a new hour: then publish aggregated data)

Now we need to redeploy this new Lambda to our Greengrass core. Here’s a script that goes through that process based on the setup from the last post. You would run this on your dev machine or your server that’s administrating your Greengrass group.

import boto3
import pprint
import time
import zipfile
from io import BytesIO # this is python3.
# ...You’d use `from StringIO import StringIO in 2.`
FILENAME = ‘path/to/corelambda.py’
SENSOR_ARN = ‘arn:aws:iot:us-west-2:123123123123:thing/Pi1’ # Update this with your ARN for the sensor device
# create a zip of new code:
buffer = BytesIO()
with zipfile.ZipFile(buffer, ‘w’) as lambda_zip:
lambda_zip.write(FILENAME, 'corelambda.py')
buffer.seek(0)
# upload to lambda
lamb = boto3.client('lambda')
new_lambda = lamb.update_function_code(FunctionName='gg1', ZipFile=buffer.read(), Publish=True)
arn = new_lambda['FunctionArn']

# Update Greengrass
gg = boto3.client('greengrass')

group = gg.list_groups()['Groups'][0] # I only have one group
id = group['Id']
gg.reset_deployments(GroupId=id)

fun = gg.list_function_definitions()['Definitions'][0]
fun_version = gg.create_function_definition_version(
FunctionDefinitionId=fun['Id'],
Functions=[{
'Id': '1',
'FunctionArn': arn,
'FunctionConfiguration': {
'Executable': "corelambda.handler",
'MemorySize': 128000,
'Timeout': 3
}
}]
)
# need to update the subscriptions with the new function ARN version
sub = gg.list_subscription_definitions()['Definitions'][0]
sub_version = gg.create_subscription_definition_version(
SubscriptionDefinitionId=sub['Id'],
Subscriptions=[{
'Id': '1',
'Source': SENSOR_ARN,
'Subject': 'pi1/topic_1',
'Target': arn
}, {
'Id': '2',
'Source': arn,
'Subject': "group1_core/topic_1",
'Target': "cloud"
}]
)

old_group = gg.get_group_version(GroupId=id, GroupVersionId=group['LatestVersion'])
group_kwargs = old_group['Definition']
group_kwargs.update({'GroupId': id,
'SubscriptionDefinitionVersionArn': sub_version['Arn'],
'FunctionDefinitionVersionArn': fun_version['Arn']})
group_version = gg.create_group_version(**group_kwargs)

deployment = gg.create_deployment(
DeploymentType='NewDeployment',
GroupId=id,
GroupVersionId=group_version['Version']
)
print('Started final deployment step, printing the status in 10 seconds')
pp = pprint.PrettyPrinter()
time.sleep(11)
pp.pprint(gg.get_deployment_status(
DeploymentId=deployment['DeploymentId'],
GroupId=id
))

After deploy, go back to Pi ssh and run the sensor.py script. Let it finish or stop it after it sends a few messages.

Finally, check the output of pi1.topic_1 in our whisper files:

python /opt/graphite/bin/whisper-fetch.py — from=$(date +%s -d “-6 min”) /opt/graphite/storage/whisper/pi1/topic_1.wsp

[Optional] Aggregation setup

i.e. create min, max, & avg metric files
Create /opt/graphite/conf/aggregation-rules.conf w/ these lines:

pi1.temptopic.avg (60) = avg pi1.temptopic
pi1.temptopic.min (60) = min pi1.temptopic
pi1.temptopic.max (60) = max pi1.temptopic

The format is new.topic.path (aggregation interval) = operator topic

Start the aggregator:
python /opt/graphite/bin/carbon-aggregator.py start
The aggregator is on port 2024 so go back to the old test script testcarbon.py and change the port to 2024.
The service updates carbon-cache after the aggregation period is done (60 seconds is what we set).

Run testcarbon.py and after 60 seconds into the script you should see there’s a new dir
/opt/graphite/storage/whisper/temptopic that has ‘avg.wsp’, ‘min.wsp’, ‘max.wsp’
Check the output:

python /opt/graphite/bin/whisper-fetch.py — from=$(date +%s -d “-6 min”) /opt/graphite/storage/whisper/pi1/temptopic/avg.wsp

InfluxDB

The Whisper query API wasn’t very robust. It seems worth trying InfluxDB since it has a CLI and SQL query syntax. (It also has clustering tools which might be an interesting experiment in a group of devices)
This can actually replace graphite, and in this section I’m using InfluxDB alone. However one benefit of using them both together is the graphite setup can automatically create our graphite DB in influx… but that setup requires installing Go.

Install

See Debian instructions:
https://docs.influxdata.com/influxdb/v1.3/introduction/installation/#installation

For my Pi it looked like:

curl -sL https://repos.influxdata.com/influxdb.key | sudo apt-key add -source /etc/os-releaseecho “deb https://repos.influxdata.com/debian jessie stable” | sudo tee /etc/apt/sources.list.d/influxdb.listsudo apt-get update && sudo apt-get install influxdb
sudo systemctl start influxdb

Edit /etc/influxdb/inxludb.conf
In practical settings, we’ll want to lock this down (secure admin user and create roles, https, firewall port 8086).
For now uncomment the following items and set them:
reporting-disabled = true

Use the updated config:
influxd -config /etc/influxdb/influxdb.conf

Test the influx shell:

influx
> CREATE DATABASE group1;
> CREATE RETENTION POLICY “10 days” ON “group1” DURATION 10d REPLICATION 1;
> SHOW DATABASES;

Install the python client module:
pip install influxdb
Here’s a test script testinflux.py:

from datetime import datetime
from influxdb import InfluxDBClient
client = InfluxDBClient(‘127.0.0.1’, 8086, None, None, ‘group1’)
for i in range(5):
data = random.randint(0, 100)
print("Sending %s of 6: data=%s" % (i + 1, data))
json_body = [dict(
measurement='pi1',
tags={'topic': 'temptopic'},
time=datetime.utcnow().isoFormat('T'), # "2009-11-10T23:00:00Z",
fields={'value': data}
)]
client.write_points(json_body)
time.sleep(5) if i < 4 else print('printing query results:')

for row in client.query("select * from pi1")[('pi1', None)]:
print(row)

In that script I used ‘pi1’ as a table, and tags to make a [sub-index?] for topics.

As with all database solutions, this quickly raises the need for more research on optimizing table structure.

Aggregation

To aggregrate in InfluxDB, use “Continuous Queries” in the CLI. Example of aggregating our inserts above into 1 minute average:

CREATE CONTINUOUS QUERY “pi1_1m_avg”
ON group1
BEGIN
SELECT mean(value) as value
INTO “avg.pi1”
FROM pi1
GROUP BY time(1m), topic
END

This will apply to new incoming metrics. Run the test script to generate the new metrics then check the result:
select * from "avg.pi1"

Summary

We demonstrated a simple implementation of both Graphite and InfluxDB on a Greengrass core device. In the Graphite tutorial we demonstrated how to deploy a new Lambda version to your Greengrass devices using a script. We didn’t provide the same script for InfluxDB here, but a similar process can be used to add a Lambda for an InfluxDB process.

--

--

Gabe Ibarra
TensorIoT

Entrepreneur w/ background in Software Engineering. I love learning & sharing knowledge. My channel features technical content. Follow at twitter.com/gabeibarra