The Sync-Reflector

Greg Cooper
Dwelo Research and Development
6 min readMay 11, 2020
Photo by Jack Hodges on Unsplash

This is part 3 of a series of posts about how Dwelo software uses Twilio Sync as its IoT communications platform. If you missed part 2, you can find it here.

Recap

In the previous posts of this series we described how Dwelo uses Twilio Sync to manage state synchronization for all of the IoT devices that are found in Dwelo’s smart apartments. We shared how sensor state is represented in the Sync cloud by using Sync maps, which are dictionaries of key/value pairs that record the state of each IoT device. We also shared how we use those maps to represent commands to the devices in the form of desired state.

Now, in this post, we will share what we do with those state changes once the Dwelo cloud receives them from Sync.

Getting Notified

Twilio Sync notifies connected clients about changes to Sync maps using a publish-subscribe model. And Twilio provides libraries so that developers can create clients that subscribe to map updates from the Sync cloud.

However, due to the cost of connecting each client directly to the Sync cloud and subscribing for updates — e.g. one connection for every instance of the Dwelo IoS and Android apps being used by our customers — Dwelo chose instead to have a single entity in the Dwelo cloud receive all Sync notifications via a webhook.

We created a service that receives the Sync map updates from the Twilio cloud, and then publishes the updates internally for our clients to consume.

The Sync-Reflector Service

The sync-reflector service was created to provide a mirror of all activity that occurs in the Twilio Sync cloud.

We use the sync-reflector to:

  1. Publish all Sync events to our own pubsub system (AWS SNS) that other Dwelo services can subscribe to.
  2. Report all Sync activity to our analytics services.

The service is comprised of the AWS components shown here:

When the Sync cloud has a change to report, it publishes the event through a webhook to an API Gateway. The gateway then passes the update event to a lambda function for processing.

Webhook Authentication

The lambda first authenticates that the source of the event was the Twilio Sync cloud using a helper library from Twilio. The helper library hashes the incoming request using HMAC-SHA1 and with the Dwelo auth token as a key to validate against a special header in the request called the X-Twilio-Signature.

from twilio.request_validator import RequestValidator
validator = RequestValidator(TWILIO_AUTH_TOKEN)
def lambda_handler(event, context):
body_validator = TwilioBodyValidator(
event['headers'],
event['requestContext'],
event['body'],
validator
)
if body_validator.validate():
logger.info(f"Request validated. Event: {event}")
class TwilioBodyValidator(object):
""" Validate that the request came from the Twilio cloud
using our authorization token
"""
def __init__(self, headers, req_context, body_data, validator):
self._uri = urlunparse((
headers["X-Forwarded-Proto"],
headers["Host"],
req_context["path"],
"", "", "")
)
self._twilio_sig = headers["X-Twilio-Signature"]
self._body_data = body_data
self._validator = validator
def validate(self):
validation_event = dict(
parse_qsl(
self._body_data,
keep_blank_values=True
)
)
return self._validator.validate(
self._uri,
validation_event,
self._twilio_sig
)

Once the event source has been authenticated, the lambda publishes the event to an SNS topic, and also puts the event record into a Kinesis Firehose for our analytics engine to consume.

SNS Publication

When publishing to SNS, the lambda packages a set of message attributes that SNS subscribers can use to filter on. The set of attributes are:

  • MapUniqueName (i.e. the uid of the device)
  • EventType (e.g. map_item_created, map_item_updated)
  • ItemKey (e.g. DoorLocked, HeartBeat, Temperature)
parsed_event = dict(parse_qsl(event[‘body’]))# Make each of these attributes in the Sync map a 
# filterable attribute for the SNS message.
SYNC_ATTRIBUTES = ['MapUniqueName', 'EventType', 'ItemKey']
attributes = dict()
for attr in SYNC_ATTRIBUTES:
attributes[attr] = {
'DataType': 'String',
'StringValue': parsed_event[attr]
}
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Message=json.dumps(parsed_event),
MessageAttributes=attributes
)

For lambdas that subscribe to the SNS topic, they can create a filter using the attributes above in order to receive a pre-filtered list of events to process.

For example, if a service wanted to be notified of all battery level events, so that it could send a notification email to users when the reported level was low, the service would register this filter:

{
"ItemKey": [
"BatteryLevel"
]
}

This would cause that service to receive events from all devices in the system that reported BatteryLevel. Other services might want to subscribe to map_item_created events so that they can take action whenever a new device is created in the system.

Analytics

Knowing how customers are using their devices is certainly a must-have feature of large-scale IoT systems. At Dwelo, every light that turns off and every door lock that locks is carefully recorded in our analytics engine.

After the events are reported and processed by our sync-reflector service they are put into a Kinesis Firehose. The Firehose bundles the events together and sends them on their way to data warehousing provided by an Amazon Redshift cluster. Once in Redshift, the events are available for queries.

During each records travel through the analytics engine we mutate it into a format that takes advantage of the columnar storage properties of Redshift. Besides normalizing field names, this also includes extracting elements from nested data that we might want to search on later and putting those elements into their own fields. For example, door unlock events are mutated so that the identifier for the user code that unlocked the door is extracted and available for queries.

def lambda_handler(event, context):
output = []
for record in event["records"]:
input_event = decode_record_data(record["data"])
mutated_event = create_mutated_event(
record["recordId"],
input_event
)
output_record = create_output_record(
record["recordId"],
mutated_event
)
output.append(output_record)
return {"records": output}def decode_record_data(record_data):
event = base64.decodestring(record_data)
event = dict(urlparse.parse_qsl(event))
return event
def create_mutated_event(record_id, input):
mutated = dict(
map_name=input.get("MapUniqueName"),
update_timestamp=input.get("DateCreated"),
event_type=input.get("EventType"),
item_key=input.get("ItemKey"),
raw_item_data=input.get("ItemData", "")
)
# Break apart the raw item data further. This data will vary
# from one record type to another.
...
return mutated

def create_output_record(record_id, mutated_event):
output_event = json.dumps(mutated_event)
output_record = dict(
recordId=record_id,
result="Ok",
data=base64.encodestring(output_event)
)
return output_record

Putting it Together

Storing our data lake in S3 gives us tremendous flexibility in manipulating the data, as well as in controlling its lifetime. Also, it provides great *replayability* into our data warehouse. When we first developed our analytics solution we chose AWS Redshift to leverage its proximity to the data source, as well as to take advantage of its out-of-the-box pipeline. Recently, however, we have migrated to Google BigQuery. This was optimal for the large analytical queries that we need to run. But we were able to do so without much difficulty because of our existing S3 infrastructure.

In Conclusion

With Twilio Sync as our source, and using some simple AWS components, we created a highly available pub/sub messaging service that has configurable streams for all of our sensor state changes, as well as a robust and highly scalable data warehousing solution! These pipelines have enabled Dwelo to receive and process more than 5M sensor events daily with very little configuration or investment in infrastructure.

Check out the 4th and final post of this series, where we describe our service for handling all of the IoT device state that is received by the Dwelo cloud.

--

--