AWS IoT Analytics + Sagemaker / Quicksight : la solution IoT à adopter ?

Daniel Zhang
neoxia
Published in
7 min readOct 21, 2019

IoT Analytics est un service permettant de prendre en main avec une grande facilité et rapidité un projet analytique lié à des objets connectés. Ce service, proposé par Amazon Web Services (AWS) depuis 2018, rassemble tous les composants importants à l’analyse de données IoT en un, s’occupant à la fois de l’ingestion et de la mise à disposition des données aux autres services. Toutefois, IoT Analytics n’est pas un service qui permettra de gérer et de monitorer tout un parc d’objet connectés.

Les solutions cloud proposées par Google et Microsoft impliquent de connaître un ensemble de service divers et variés pour mettre en place un projet IoT. Par exemple, Google Cloud Platform nécessite de connaître le fonctionnement de Cloud Pub/Sub et de Dataflow pour s’occuper de l’ingestion des données et de BigQuery pour le stockage des données.

À l’inverse, Amazon nous fournit un service tout-en-un qui apporte un gain d’efficacité certain face à ces deux principaux concurrents ; moins de temps est donc passé sur l’ingestion des données, pour se concentrer sur l’analyse.

Le temps d’apprentissage nécessaire à l’utilisation du service est très courte. Néanmoins, si vous possédez un bagage conséquent sur Azure ou sur Google Cloud Platform, les solutions sur ces deux plateformes peuvent également être envisagées comme alternatives à IoT Analytics.

Comment fonctionne IoT Analytics ?

IoT Analytics est un service très simple d’utilisation ; il est découpé en quatre sous-services que sont : channel, pipeline, datastore et dataset.

Fig. 1 : La chaîne de services IoT Analytics

Un “channel” sert de point d’entrée au service ; les données entrantes sont enregistrées pour des traitements ultérieurs et sont envoyées dans la pipeline. Lors de la création d’un channel, il est possible de définir un topic MQTT (protocole pub/sub utilisé principalement pour des transports de messages de faible taille, notamment pour les microcontrôleurs) depuis lequel récupérer les données.

Une “pipeline” effectue les transformations nécessaires sur les données brutes reçues par un channel pour les rendre utilisables lors de l’analyse. Nous pouvons notamment retenir la possibilité d’ajouter ou de retirer des attributs ou de transformer certains attributs à l’aide d’une fonction Lambda.

Fig. 2 : Calcul d’un nouvel attribut temperatureCelsius

La figure 2 montre comment il est possible d’utiliser la pipeline pour créer un attribut temperatureCelsius à partir de l’attribut temperature d’après la relation mathématique donnée dans la case Formula.

À noter : les données en sortie de pipeline doivent être au format JSON. Il faut donc effectuer une conversion de type si les données entrantes ne sont pas au bon format (par exemple en CSV).

Un “datastore” va enregistrer les données traitées par une pipeline dans un bucket Amazon S3. Néanmoins, il n’est pas requis de connaître les détails S3 pour se servir d’un datastore, IoT Analytics prenant en main la configuration du stockage.

Enfin, le “dataset” vous permettra de générer des vues matérialisées au format CSV via une requête SQL.

Quelles sont les inconvénients d’utiliser IoT Analytics ?

IoT Analytics, comme tout service, n’est pas sans inconvénient.

Un des problèmes majeurs de ce service est la complication qu’apporte une migration de données d’une source existante vers IoT Analytics. Bien que le service autorise l’import par batch, il ne peut se faire que par appel de l’API ou l’usage d’un SDK, qui imposent tous deux une limite au niveau de la fréquence d’appel et de la quantité de données par batch.

Les deux exemples de code qui suivent sont des fonctions Lambda permettant l’import d’un fichier CSV depuis S3 vers IoT Analytics à l’aide de la SDK Python.

MAX_RECORDS_PER_REQUEST = 500
MAX_REQUESTS_PER_SECOND = 2def lambda_handler(event, context):
bucket = event["bucket"]
key = event["key"]
channel_name = event["channel_name"]
stream_name = event["stream_name"]
kinesis_client = boto3.client("kinesis")
s3_client = boto3.client("s3")
query = "SELECT COUNT(*) FROM s3object s"
in_serialize = {
"CSV": {
"FileHeaderInfo": "NONE",
"RecordDelimiter": "\n",
"FieldDelimiter": ","
}
} out_serialize = { "JSON": { "RecordDelimiter": "\n" } }
r = s3_client.select_object_content(Bucket=bucket, Key=key, Expression=query, ExpressionType='SQL', InputSerialization=in_serialize, OutputSerialization=out_serialize) for event in r['Payload']:
if 'Records' in event:
count = json.loads(event['Records']['Payload'])['_1']
data = {"bucket": bucket, "key": key, "channel_name": channel_name} step = 100000
jobs = [{'start': step * i, 'end': step * (i + 1), **data} for i in range(count // step)]
if count % step != 0: jobs += [{'start': count - count % step, 'end': count, **data}]
for request_jobs in chop(MAX_RECORDS_PER_REQUEST, jobs):
records = [{"Data": json.dumps(job), "PartitionKey": key} for job in request_jobs]
put_records(kinesis_client, stream_name, records)
return "Launcher sent key {} with {} lines into {}".format(key, count, stream_name)# 1 kinesis shard can ingest at most 1000 records per second
# We limit the rate to ensure we do not go over
@RateLimiter(max_calls=MAX_REQUESTS_PER_SECOND, period=1)
def put_records(kinesis_client, stream_name, records):
kinesis_client.put_records(StreamName=stream_name, Records=records)MESSAGES_PER_REQUEST = 100
MAX_REQUESTS_PER_SECOND = 1000
HEADER = ("device_id", "measure", "timestamp")
def lambda_handler(event, context):
record = event["Records"][0]
job_input = json.loads(base64.b64decode(record["kinesis"]["data"]))
bucket = job_input["bucket"]
key = job_input["key"]
channel_name = job_input["channel_name"]
start = job_input["start"]
end = job_input["end"]
s3_client = boto3.client("s3")
lines = s3_client.get_object(Bucket=bucket, Key=key)["Body"].iter_lines()
cur = 0
while cur < start:
next(lines)
cur += 1 to_append = []
while cur < end:
to_append.append(next(lines))
cur += 1
content = b'\n'.join(to_append).decode('utf-8')
serialized_rows = serialize_rows(content)
messages = generate_messages(serialized_rows)
num_requests = 0
iot_analytics_client = boto3.client("iotanalytics")
for messages_batch in chop(MESSAGES_PER_REQUEST, messages):
send_batch_put_message(iot_analytics_client, channel_name, list(messages_batch))
num_requests += 1 return "{} batchPutMessage requests sent for {}".format(num_requests, key)
@RateLimiter(max_calls=MAX_REQUESTS_PER_SECOND, period=1)
def send_batch_put_message(iot_analytics_client, channel_name, messages_batch):
iot_analytics_client.batch_put_message(channelName=channel_name, messages=messages_batch)
def serialize_rows(file_contents):
reader = csv.DictReader(StringIO(file_contents), fieldnames=HEADER)
return (row for row in reader)
def generate_messages(serialized_rows):
for messageId, row in enumerate(serialized_rows):
yield {"payload": json.dumps(row), "messageId": str(messageId)}

Les autres inconvénients proviennent de la rigidité du service face aux besoins que nous pourrions avoir. Par exemple, bien que le “dataset” soit construit à partir d’une requête SQL, celle-ci ne permet pas de mettre en relation les données de plusieurs “datastore” directement sur la plateforme et aussi facilement qu’avec les jointures habituellement disponibles en SQL. Cela s’explique probablement par l’utilisation de la technologie S3 SELECT pour permettre l’usage de SQL sur des fichiers non ingérés. Ainsi, certaines des fonctionnalités proposées par IoT Analytics ne sont en réalité que des surcouches de services Amazon déjà existants.

Malgré ces défauts, IoT Analytics est à même de s’adapter à beaucoup de situations, un atout renforcé par sa compatibilité avec d’autres services comme Sagemaker et Quicksight.

Comment Sagemaker opère-t-il avec IoT Analytics ?

Sagemaker est un service servant à entraîner et à déployer des modèles de machine learning avec des algorithmes built-in proposés par Amazon. En outre, il sert à héberger des notebooks Jupyter pour les data scientists qui souhaiteraient effectuer une analyse plus poussée sur les données et pour créer des modèles customisés.

Cette solution serverless propose de créer des instances avec des capacités de traitement plus ou moins élevées, allant aujourd’hui jusqu’à 64 vCPU, 16 Tesla K80 (unité graphique) et 732 Go de RAM pour une instance donnée.

L’environnement des notebooks Jupyter met fortement en avant Python comme langage de programmation. Amazon propose justement une SDK Python, Boto 3, permettant une interaction très complète avec chacun des services AWS. Bien évidemment, Boto 3 comprend des méthodes pour faire appel au service IoT Analytics, telle que la récupération des données depuis un dataset.

Sagemaker met également à disposition de nombreux modèles de notebooks pré-écrits sur lesquels s’appuyer. On peut notamment trouver des exemples de modèle de maintenance prévisionnelle, de détection d’anomalies, d’apprentissage par renforcement, etc. Bien que Sagemaker ne propose donc pas d’utiliser les données IoT Analytics de façon native, la SDK Python d’Amazon permet une interaction très facile.

Pourquoi utiliser Quicksight ?

Quicksight est à Amazon ce que DataStudio est à Google et PowerBI à Microsoft : un outil de visualisation de données et d’aide à la prise de décision permettant de créer aisément des graphiques et dashboards. Les tableaux de bord créés peuvent ensuite être partagés avec des collaborateurs.

Quicksight propose un connecteur natif avec différentes sources de données, dont les “dataset” de IoT Analytics. L’outil est capable d’agréger des séries temporelles, le type de données le plus souvent récupéré depuis des objets connectés. On peut ajouter que la version entreprise de Quicksight intègre des fonctionnalités de machine learning utilisées à des fins de prévision et de déte4ction d’anomalies. L’interopérabilité entre les différents services d’Amazon est, ici aussi, ce qui constitue la force du couple IoT Analytics/Quicksight.

Fig. 3 : Un tableau de bord Quicksight

Conclusion

IoT Analytics est aujourd’hui la solution phare proposée par Amazon pour permettre l’analyse de données provenant d’objets connectés ; elle fait toutefois l’impasse sur l’analyse en temps réel. Mais pour tous les projets assez flexibles pour accepter une analyse avec quelques minutes de délai, le service se révèle particulièrement facile d’utilisation. L’intégration quasi-parfaite avec d’autres services Amazon tels que Quicksight ou Sagemaker en fait une solution de choix pour le design d’une solution IoT dans le cloud.

Fig. 4 : Exemple d’architecture proposée

--

--