Streaming data with a Raspberry Pi

Using MicroPython, Apache Kafka and Confluent Cloud

Ana Escobar
Towards Data Engineering
7 min readApr 24, 2023

--

Photo by Michael Smith — Speed of Light

Real-time systems have become increasingly popular in recent years, driven by the growing demand for high-performance, responsive and reliable systems. They are designed to process data and respond to events in real-time or near real-time, providing immediate feedback and control. They are used in a wide range of applications, from industrial automation and control systems to self-driving cars and video games. With the advent of new technologies such as the Internet of Things (IoT) and the increasing use of cloud computing, real-time systems are becoming even more prevalent and essential to our daily lives.

One of the most popular streaming platform is Apache Kafka. It is an open-source distributed streaming platform that is widely used for building real-time data pipelines and streaming applications. It is popular because of its high throughput, low latency, scalability, fault-tolerance, and ease of integration with other systems. Due to this, it was the chosen platform to use in our project. The whole design of the project can be seen in the following picture:

As an overview, we developed a system that serves two use cases:

  • Predictive maintenance
  • Health device monitoring

We generate real-time data from two Windows laptops and a Raspberry Pi, which contains device and sensor measurement information. The data is sent to two different Kafka topics, where it is consumed and used to make predictions on whether a device is stressed. The results are stored in an AWS DynamoDB table along with the measurements. If the prediction is positive, a notification is sent to the user via a Telegram Bot, and the Bot asks for feedback to improve the accuracy of the model by updating the prediction in the DynamoDB table. Users can also monitor device information by asking the Telegram Bot for the latest information stored in the DynamoDB table.

In this post, we will focus on the data ingestion (producer) using the Raspberry Pi into a Kafka topic.

Libraries

confluent-kafka

When working with an OS different than Windows (with aarch64), to install theconfluent-kafka library you might encounter some incompatibility errors since they do not provide prebuilt binary wheels for aarch64. You will need to compile it yourself, which requires to first build and install librdkafka from source. Follow these steps:

sudo apt-get install -y libssl-dev zlib1g-dev gcc g++ make
git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure --prefix=/usr
make
pip install confluent-kafka==1.9.2

If after this installation you try to run the code and get the following error: Undefined Symbol: rd_kafka_producev it is most likely because you have an earlier version installed in /usr and the newest version you just installed is located in /usr/local and it will not be picked up automatically. You can check it by running sudo apt-get purge librdkafka1 librdkafka-dev.

To solve this issue you have to remove the previous versions from the deb package:

sudo apt-get purge librdkafka1 librdkafka-dev

vcgencmd

This library will help us read the sensors’ information of our Raspberry Pi that will be sent to the Kafka topic.

pip install vcgencmd~=0.1.1

jsonschema

We will need this library installed to be able to import the JSONSerializer class from confluent_kafka.schema_registry.json_schema . A JSON Schema is a declarative language that allows you to annotate and validate JSON documents, in our case it ensures the quality of client produced data.

pip install jsonschema==4.17.1

Gathering data from the Raspberry Pi using vcgencmd library

By using the vcgencmd library we can do the following to obtain the sensors’ values:

import vcgencmd

class Raspberry(object):
"""
This class defines a Raspberry Pi object with its attributes in order to monitor its health
"""

def __init__(self):
"""
This function creates a new raspberry pi instance if it doesn't exist yet
"""

my_cmd = vcgencmd.Vcgencmd()

self._device = "raspberry"
self._gpu_temp_celsius = my_cmd.measure_temp()

cpu = subprocess.Popen(['cat', '/sys/class/thermal/thermal_zone0/temp'],
stdout=subprocess.PIPE).communicate()[0]
self._cpu_temp_celsius = (int(cpu) / 1000)
self._frequency_arm_hz = my_cmd.measure_clock('arm')
self._frequency_core_hz = my_cmd.measure_clock('core')
self._voltage_core_v = my_cmd.measure_volts('core')
self._memory_arm_bytes = my_cmd.get_mem('arm')
self._memory_gpu_bytes = my_cmd.get_mem('gpu')

throttled_out = subprocess.Popen(['vcgencmd', 'get_throttled'], stdout=subprocess.PIPE).communicate()
throttled_value = throttled_out[0].decode().split('=')[1]
self._throttled = throttled_value.split('\n')[0]

@property
def json(self):
"""
This function returns a json document with all the raspberry pi information
:return: json document
"""
return {
"device": self.device,
"GPU_temp_celsius": self.gpu_temp_celsius,
"CPU_temp_celsius": self.cpu_temp_celsius,
"frequency_arm_hz": self.frequency_arm_hz,
"frequency_core_hz": self.frequency_core_hz,
"voltage_core_v": self.voltage_core_v,
"memory_arm_bytes": self.memory_arm_bytes,
"memory_gpu_bytes": self.memory_gpu_bytes,
"throttled": self.throttled
}

def __str__(self):
return str(self.json)

Topic Schema

In this python file we will define our JSON Schema for the topic with the sensors’ information we want to send.

"""This file represents the topic 'raspberry' schema for data validation purposes"""
schema = """
{
"description": "Raspberry Pi measurements",
"properties": {

"device": {
"description": "",
"type": "string"
},
"GPU_temp_celsius": {
"description": "",
"type": "number"
},
"CPU_temp_celsius": {
"description": "",
"type": "number"
},
"frequency_arm_hz": {
"description": "",
"type": "number"
},
"frequency_core_hz": {
"description": "",
"type": "number"
},
"voltage_core_v": {
"description": "",
"type": "number"
},
"memory_arm_bytes": {
"description": "",
"type": "number"
},
"memory_gpu_bytes": {
"description": "",
"type": "number"
},
"throttled": {
"description": "",
"type": "string"
}
},
"required": [
"device",
"GPU_temp_celsius",
"CPU_temp_celsius",
"frequency_arm_hz",
"frequency_core_hz",
"voltage_core_v",
"memory_arm_bytes",
"memory_gpu_bytes",
"throttled"
],
"title": "RaspberryPi",
"type": "object"
}
"""

Kafka producer

The Kafka producer is responsible for publishing data or messages to a Kafka cluster. A producer sends messages to Kafka topics, which are logical channels or categories for organising and partitioning data within the Kafka cluster. The producer is designed to be scalable and fault-tolerant, meaning it can handle large volumes of data and automatically recover from failures.

Note that in the following code, the environment variables (os.environ[...]) will need to be filled with your Kafka Cluster information. You can register in Confluent Cloud for free and get advantage of the free credit ($400) they offer to new users.

from uuid import uuid4
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from generators.kafkaproducer.raspberry_sensor_schema import schema as schema_rasp_str
import os
import logging


def sensor_to_dict(data, ctx):
"""
Function to transform a Sensor object into a dictionary
:param data: Sensor
:param ctx:
:return:
"""
return dict(data)


def delivery_report(err, msg):
"""
Message to be sent on delivery of a produced message to a kafka topic
:param err: error message
:param msg: success message
:return:
"""
if err is not None:
logging.info(f"Delivery failed for Sensor record {msg.key()}: {err}")
logging.info(
f'Sensor record {msg.key()} successfully produced to {msg.topic()} [{msg.partition}] at offset {msg.offset()}')


class KafkaProducer:
"""
KafkaProducer Class to send topics to kafka cloud cluster
"""

def __init__(self):
"""
Initialization of the class KafkaProducer configuring the broker and cloud settings from Confluent Cloud
"""
conf = {
'bootstrap.servers': os.environ["KAFKA_BROKER_SETTINGS"],
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': os.environ["KAFKA_CLUSTER_KEY"],
'sasl.password': os.environ["KAFKA_CLUSTER_SECRET"]
}

schema_registry_conf = {'url': os.environ["KAFKA_SCHEMA_ENDPOINT"],
'basic.auth.user.info': f"{os.environ['SCHEMA_USERNAME']}:{os.environ['SCHEMA_PASSWORD']}"
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
self.json_serializer = JSONSerializer(schema_rasp_str, schema_registry_client, sensor_to_dict)
self.string_serializer = StringSerializer('utf_8')
self.producer = Producer(conf)

def produce(self, topic_name: str, value):
"""
Function to send a topic to the kafka cluster
:param topic_name: topic name
:param value: the value record to be sent to the kafka topic
:return:
"""
logging.info(f" ---- Producing records to topic {topic_name} --- ")
self.producer.produce(topic=topic_name, key=str(uuid4()), value=value)

logging.info("\nFlushing records...")
self.producer.flush(30) # send the data
logging.info(f'Produced to topic {topic_name}')

def produce_json(self, topic_name: str, data):
"""
Function to produce a json document to a kafka topic
:param topic_name: name of the topic
:param data: the value record to be sent to the kafka topic
:return:
"""
logging.info(f" --- Producing record to topic {topic_name} --- ")
self.producer.poll(0.0)

self.producer.produce(topic=topic_name,
key=self.string_serializer(str(uuid4()),
SerializationContext(topic_name, MessageField.VALUE)),
value=self.json_serializer(data,
SerializationContext(topic_name, MessageField.VALUE)),
on_delivery=delivery_report)

logging.info("\nFlushing record...")
self.producer.flush()

logging.info(f'Produced json encoded record to topic {topic_name}')

Main file

This file acts as the main application’s execution point which is responsible of producing sensors’ data to your Kafka topic every 5 seconds.

Note that in the following code, the Kafka topic (os.environ['TOPIC_NAME']) will need to be filled with your Kafka topic name created in your Kafka Cluster.

import time
import Raspberry
import KafkaProducer
import os


def data_push():
"""
This function sends a json message to the Kafka topic specified from the Raspberry Pi device
:return: None
"""
topic = os.environ['TOPIC_NAME']
raspberry = Raspberry().json
KafkaProducer().produce_json(topic_name=topic, data=raspberry)


def main():
"""
Main method of the file. Infinite loop to produce records to the kafka producer topic
:return: None
"""
while True:
data_push()
time.sleep(5)


if __name__ == "__main__":
main()

After this, you will have a real-time system developd using your Raspberry Pi that sends your sensors’ data to a Kafka topic every 5 seconds. You can easily visualise the data flow in you Confluent Cloud account.

You can throttle your Raspberry Pi to also produce data where the device is stressed. This information will be needed if you want to develop a machine learning model to predict in the future whether a device is subject to get stressed.

Next steps

  • Set up a consumer (AWS DynamoDB)
  • Create the ML models based on the already produced data
  • Create the notification and monitoring system using Telegram Bot

The whole project was developed by Amr Jazouli, Abdelaziz Imgharne and me. This post has removed some parts of the original project to keep it simple but if you’re curious, you can find the whole code and further documentation in the following GitHub repository: https://github.com/escobarana/streaming_monitoring_system

Do you like this content?

Subscribe to my medium page and be the first to get notified whenever I publish a new one!

Follow me on LinkedIn for daily insights about Software & Data Engineering 🫰🏻

--

--

Ana Escobar
Towards Data Engineering

Galician 🖖🏼 | Data enthusiast, passionate about Event-Streaming platforms | Software Engineer in the Cloud Infra at Tinder | ana-escobar.com