MQTT — Part II: Smart City Design with MQTT

Onur Dündar
14 min readJul 31, 2018

--

In my first MQTT article, I wrote about MQTT in order to understand pub/sub messaging pattern and how MQTT deals with messages over network in details.

“Beautiful skyline of Tokyo with orange and blue lights illuminating its streets” by Pawel Nolbert on Unsplash

Now, I want to make a sample design to show MQTT in action , assuming I am creating a smart city with some certain sensors and applications.

Smart City Requirements and Design

Writing all the theory was easy , but let’s utilize MQTT in an IoT application, a smart city.

I store all source code for the smart city in my GitHub repository.

Let me start listing the requirements of our Smart City with asking the very basic questions.

What I want to do?

I want to know about:

  • air pollution levels: NO2, SO2
  • traffic congestion data, or traffic speed at the location.
  • temperature, wind (speed and direction) and humidity data of a city.

How can we sense environmental values and collect them?

For each specific data we will need a sensor which is able to sense data and convert to its human readable digital representation.

Below is a list of sensors which comes first at a Google search:

“A needle gauge stating that the water is currently cold” by Kelly Sikkema on Unsplash

How sensors will send digital data or how we will observe sensor data from the source?

Sensors itself are not able to send data over network, because they are only devices, which can sense from real world. Sensor itself is only for reading data from real world but rest of the operations requires computing, networking.

I need another device or hardware module which is able to connect to sensors via their I/O port (I2C, SPI etc.) and can implement MQTT over TCP/IP to send data, as well as run software applications.

Basically I need a gateway, this can be a platform like Raspberry Pi, UP2 board or even an Intel NUC according to your computing, storage requirements.

All of listed devices can run an OS and can be connected to mentioned sensors and internet.

What requirements we need from a gateway?

Gateway requirements changes with the requested features and possible future requirements for a large scaled system. At this time, briefly:

Photo by Fancycrave on Unsplash
  • Ethernet, TCP/IP connection
  • Remote connection to gateway over IP
  • Over the air update
  • Custom Linux kernel would be enough to use open source libraries to implement security stack and install certificates.
  • Secure boot, secure system access.
  • I/O connections like SPI, I2C, GPIO.
  • Asynchronous data read from sensors and sending over network.

In real life, more details should be considered about hardware as well.

  • Power consumption of device (including battery options)
  • Cost of every piece hardware and possible number of installations etc.
  • Cooling requirements, environment temperature.
  • Case requirements for secure installation to remote location.

How should we develop such system which is providing continuous data?

I am not creating a production system, I want a system that shows the working prototype of a smart city. Therefore, I will just list certain development steps I will take for my client system.

Photo by Chris Ried on Unsplash
  1. Implement device driver connections using sensor specifications to read digital values from sensors. (In my sample code, I will randomly simulate readings). You can use Linux kernel headers for device driver developments to access to connected device registers to make them run and read values into system.
  2. Read sensor data asynchronously. Linux system allows to create async threads to run continuously without effecting each other or for each sensor a unique application can run.
  3. Send sensor data (publish) asynchronously over network to MQTT broker. I will use Eclipse Paho C++/Python library to implement MQTT connection and publish/subscribe events.
  4. Set intervals for sending sensor data. For example, you can send data each minute, hour or you can send data when it changes. This is something which you should determine at design level. For example, temperature data can be send whenever it changes at a given range since we would want more accuracy over that. For air pollution data, we would like to collect NO2 and SO2 data for each minute to see their changes over time more accurately and so on.
  5. Setting up security related libraries and certificates to create a secure connection with server. OpenSSL libraries should be installed on your Linux distribution.

A summary; I will use Linux OS, Ubuntu 16.04 and use standard libraries for driver and implementation. For MQTT connections, I will use Eclipse Paho library.

How are we going to manage client connections?

Photo by rawpixel on Unsplash

This is a MQTT Broker requirement question. We would need a server to accept incoming connections and messages to receive and deliver them to right clients.

MQTT Broker implementation requires:

  • MQTT package identifications
  • User authentication
  • Accept TCP/IP connections from 1883 and 8883 ports.

I don’t need to reinvent wheel, so I will move with Eclipse Mosquitto :“ Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 3.1 and 3.1.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.”

I should also consider, quick system and configuration updates for all clients.

  • Create an OS image and download to client device and burn image to client and restart it.
  • Another update can be made to client configuration files to set intervals, stop message publish for certain sensor.

How we deal with data? Smart City Services

For a complete smart city system, there would be huge amount of data will be generated.

Let’s make a rough estimation for required data storage for a client only for 1 day. Let’s assume each of our sensors will generate ‘double’ values, 8 bytes. We have temperature, humidity, traffic speed, wind speed, wind direction (in degrees), NO2 level, SO2 level; 7 values.

7 x 8 = 56 bytes

If we request data for each minute:

56 x 60 x 24 = 80640 bytes per day ~= 0.08064 MB, 0.24 MB each month, from each client, 29 MB per year.

This is raw data to be stored. We can use constant units (Celsius, km/h etc.) to ease storage.

If I decide to run, thousands of nodes, and decrease data transfer interval to 30 seconds, storage, network bandwidth requirements will increase.

I also can’t dismiss requirement for a timestamp for each data to know when that certain value measured. It can be stored as ‘unsigned long’ which is 8 bytes as well. Any additional information will enlarge the data.

Client devices like Raspberry Pi, will be using flash storage so they can be more expensive compared to HDDs on server side.

Photo by Tobias Fischer on Unsplash

Another constraint is that, how frequent you would need to access to historical data, which would increase your access to edge device and every time a remote request should be made. That’s also another requirement for network bandwidth.

Security of storage can also be vulnerable at the edge device without a backup, so storing data on server side would be a lot better on storage expenses, easy access, less network bandwidth between server and clients and so on.

Therefore, I will store sensor data at server side and will use SQL server with following schema.

I decide to go with MariaDB, since my system can scale later (it won’t actually just pretending it can, and wanted to promote an open source project and to create scripts to initialize database in a docker image)

How to utilize smart city system?

This is a step where you need to decide how you will utilize the generated sensor data. What kind of applications, services will be in your system? In general, if you think about a municipality, they can serve APIs to get such information for non-commercial use cases. Some sample services can be as below:

  • Maps application, showing information live on map.
  • APIs to give unrestricted access to historical data for researchers.
  • APIs to give restricted access only to live data for innovative solutions for city.

Services can be expanded with new use cases and requests to smart city system.

A REST API can be implemented by data server to query certain data sets and server can return a JSON response etc.

I think this much detail is enough for such a blog article, I wanted to go over details and see how hard it can be and how can we solve any problems for such systems.

Now, it is time to go hands on. I want to start with my MQTT Broker.

Smart City MQTT Broker

“A local interchange with purple lights and city buildings in the background” by Denys Nevozhai on Unsplash

Let’s start with deploying our MQTT broker with Docker. At this stage, I will use mosquitto: Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 3.1 and 3.1.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.

I wanted to create a docker image with all configurations, following Dockerfile, which installs all required systems and libraries including MariaDB and Mosquitto however rest of the configurations need to be done manually ( you can work on Dockerfile to include scripts and config files to automate that process as well )

Please find all files in: https://github.com/odundar/mqtt-smartcity

# Use an official Python runtime as a parent image
FROM ubuntu:latest

MAINTAINER Onur Dundar <onur.dundar1@gmail.com>

# run update to get latest updates from repositories
RUN apt-get update

# install mosquitto into image
RUN apt-get install -y mosquitto mosquitto-clients

# install openssl
RUN apt-get install -y openssl

# install mariadb
#https://downloads.mariadb.org/mariadb/repositories/#mirror=ulakbim&distro=Ubuntu&distro_release=xenial--ubuntu_xenial&version=10.3

RUN apt-get install -y software-properties-common
RUN apt-key adv --recv-keys --keyserver hkp://keyserver.ubuntu.com:80 0xF1656F24C74CD1D8
RUN add-apt-repository 'deb [arch=amd64,arm64,i386,ppc64el] ftp://ftp.ulak.net.tr/pub/MariaDB/repo/10.3/ubuntu xenial main'
RUN
apt-get update

# make noninteractive setup to skip root password setup later
RUN DEBIAN_FRONTEND=noninteractive apt install -y mariadb-server

# ports need to be exposed in order to access Mosquitto and MariaDB services remotely
EXPOSE 1883 8883 3306

Mosquitto Configuration

  • Now, I will configure mosquitto service within the image:
$ docker run -ti -p 1883:1883 -p 8883:8883 -p 3306:3306 mqtt_broker:latest
  • Create password authentication for mosquitto.
$ mkdir /etc/mosquitto/passwords/ && touch /etc/mosquitto/passwords/mosquitto_pwd$ mosquitto_passwd -U /etc/mosquitto/passwords/mosquitto_pwd$ mosquitto_passwd -b /etc/mosquitto/passwords/mosquitto_pwd street1 simple_password$ mosquitto_passwd -b /etc/mosquitto/passwords/mosquitto_pwd street2 simple_password$ mosquitto_passwd -b /etc/mosquitto/passwords/mosquitto_pwd subscriber1 simple_password$ mosquitto_passwd -b /etc/mosquitto/passwords/mosquitto_pwd subscriber2 simple_passwordEdit `/etc/mosquitto/mosquitto.conf` file add following lines to enable connection with password.

>allow_anonymous false
>password_file /etc/mosquitto/passwords/mosquitto_pwd

Now, let’s set TLS configuration for secure connection.

TLS, enables transport layer security, which means MQTT packages transferred through network will be encrypted and only MQTT server and client with correct certificates and keys will be able to encrypt it.

openssl library has already been installed, so following commands should work flawlessly.

$ cd /etc/mosquitto/ca_certificates
  • Generate a server key with encrpytion
$ openssl genrsa -des3 -out server.key 2048
  • When prompted, make sure you entered Common Name as your host name. In my case, I used localhost as 127.0.0.1
$ openssl req -new -x509 -days 3650 -extensions v3_ca -keyout ca.key -out ca.crt$ openssl genrsa -out server.key 2048
  • Create server key, make sure you entered same Common Name (127.0.0.1) as previous one.
$ openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 3650
  • Leave files in certs folder, and get copy of ca.crt to be used in your client.

Copy ca.crt to your client.

$  docker cp <containerid>:/etc/mosquitto/certs/ca.crt <destination folder>

Add following lines to /etc/mosquitto/mosquitto.conf file

listener 8883cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
tls_version tlsv1
  • Restart service
$ service mosquitto restart

This is a basic configuration, for production you should read more about TLS and openssl configurations. Required commands are also provided within mosquitto manual.

References for TLS config:

Sensor Simulator

Before going forward, I would like to introduce a Python module which can asynchronously generate sensor values using numpy normal distribution.

See: sensor_simulator.py file

I want to create a basic module which one can start tasks to generate random values.

Below code is a part of whole SensorSim class to generate random values.

celcius = 0.0
cel_running = False

async def
__celcius_temperature__(self, mean=21.5, std_dev=3.5, interval=10):
cel_running = True
while
cel_running:
self.celcius = np.random.normal(loc=mean, scale=std_dev)
await asyncio.sleep(interval)

loop = asyncio.get_event_loop()
if not self.cel_running:
loop.stop()
loop.close()

def start_celcius_temperature(self, mean=21.5, std_dev=3.5, interval=10):
loop = asyncio.get_event_loop()
loop.create_task(self.__celcius_temperature__(mean, std_dev, interval))
print('{} started'.format(__name__))
if not loop.is_running():
loop.run_forever()
return True

In order to use this class, just import module and call the start method, read values from celcius.

from sensor_simulator import SensorSimsim = SensorSim()

sim.start_celcius_temperature(mean=28.7, std_dev=3.5, interval=10)
print(sim.celcius)

There can be a lot better simulators but this is a basic one I implemented for my self in order to show value generation and reading.

In real life case, read value will be from the sensor device but rest of the service will be same. (I might write the third part using real sensors)

Smart City — Client (Publisher)

First, I worked on a prototype with Python to mock a client which is able to send data to MQTT broker.

I have used Paho package to connect MQTT server and send messages.

Python Script

I suggest you to look up the Jupyter notebook, which I used for prototyping process before starting to write actual scripts which I plan to run on the client.

I tried to understand basic methods and features, which I require to implement simulator and the actual publisher.

I created a Config Model object which , I intend to keep client configuration and supposedly reading a config file. Config file is the easiest way to control such applications remotely with many parameters effecting the behavior of application.

class ClientConfig():
'''
Client Connection Configurations
'''
SystemName = "hypo_city"

# city id and street name
CityName = "city_01"
StreetName = "street_01"

def
GetClientId(self):
return self.CityName + "_" + self.StreetName

# define the server IP and port to be used
MQTTServer = "127.0.0.1"
TSLServerPort = 8883
ServerPort = 1883

Username = 'street1'
Password = 'simple_password'

QoS = 0
Retain = False

def
parse_config_file(self, file_name=''):
"""
A config file can be used to parse data

:param file_name:
:return:
"""

return True

def
topic_prefix(self):
return self.SystemName + "/" + self.CityName + "/" + self.StreetName

Then, I just create a task which is able to read all the sensor readings and create custom topics according to them. This process can also be generalized but there was no need to it at this time.

async def publish_message(mqtt_client=mqtt.Client(), topic_prefix='', qos=0, retain=False):
"""

:param mqtt_client:
:param topic_prefix:
:param qos:
:param retain:
:return:
"""
print('{} started'.format(__name__))

sim = SensorSim()

sim.start_celcius_temperature(mean=28.7, std_dev=3.5, interval=10)
sim.start_fahrenheit_temperature(mean=88.3, std_dev=9.5, interval=10)
sim.start_no2_level(mean=87.3, std_dev=0.5, interval=10)
sim.start_so2_level(mean=122.1, std_dev=0.5, interval=10)
sim.start_wind_direction(mean=123.8, std_dev=65.7, interval=10)
sim.start_wind_speed(mean=15.8, std_dev=8.5, interval=10)
sim.start_traffic_speed(mean=0.0, std_dev=25, interval=10)

while True:
utc_time = datetime.datetime.utcnow()

# message structure
# timestampt \t data

message_f = '{}\t{}'.format(utc_time, sim.fahrenheit)
message_c = '{}\t{}'.format(utc_time, sim.celcius)
message_so2 = '{}\t{}'.format(utc_time, sim.so2_level)
message_no2 = '{}\t{}'.format(utc_time, sim.no2_level)
message_ws = '{}\t{}'.format(utc_time, sim.wind_speed)
message_wd = '{}\t{}'.format(utc_time, sim.wind_direction)
message_ts = '{}\t{}'.format(utc_time, sim.traffic_speed)

topic = topic_prefix + "/temperature/fahrenheit"
publish_info = client.publish(topic, payload=message_f, qos=qos, retain=retain)

if publish_info.is_published:
print("Topic:{}, Message:{}".format(topic, message_f))
else:
print("Message Not Published")

topic = topic_prefix + "/temperature/celcius"
publish_info = client.publish(topic, payload=message_c, qos=qos, retain=retain)

if publish_info.is_published:
print("Topic:{}, Message:{}".format(topic, message_c))
else:
print("Message Not Published")

topic = topic_prefix + "/air_pollution/so2_level"
publish_info = client.publish(topic, payload=message_so2, qos=qos, retain=retain)

if publish_info.is_published:
print("Topic:{}, Message:{}".format(topic, message_so2))
else:
print("Message Not Published")

topic = topic_prefix + "/air_pollution/no2_level"
publish_info = client.publish(topic, payload=message_no2, qos=qos, retain=retain)

if publish_info.is_published:
print("Topic:{}, Message:{}".format(topic, message_no2))
else:
print("Message Not Published")

topic = topic_prefix + "/air_condition/wind_speed"
publish_info = client.publish(topic, payload=message_ws, qos=qos, retain=retain)

if publish_info.is_published:
print("Topic:{}, Message:{}".format(topic, message_ws))
else:
print("Message Not Published")

topic = topic_prefix + "/traffic/traffic_speed"
publish_info = client.publish(topic, payload=message_ts, qos=qos, retain=retain)

if publish_info.is_published:
print("Topic:{}, Message:{}".format(topic, message_ts))
else:
print("Message Not Published")

topic = topic_prefix + "/air_condition/wind_direction"
publish_info = client.publish(topic, payload=message_wd, qos=qos, retain=retain)

if publish_info.is_published:
print("Topic:{}, Message:{}".format(topic, message_wd))
else:
print("Message Not Published")

await asyncio.sleep(10)

Above code , starts sensor simulator. In real life cases, this may lead you to read certain memory locations, volatile variables or reading registers of sensors etc. However, in my case I just read from the simulator.

Finally, I need to initialize MQTT client and set the connection to send messages to them. But before that, we need to define callback functions for MQTT client.

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))

def on_disconnect(client, userdata, rc):
print("Disconnected with result code " + str(rc))

def on_publish(client, userdata, result):
print("Message Published {}".format(result))

I wrote all required connection and client creation to __main__ part to make script start.

if __name__ == '__main__':
cfg = ClientConfig()

if not cfg.parse_config_file():
exit(-1)

# Create Client Object and assign publisher callback methods
client = mqtt.Client(client_id=cfg.GetClientId())

# assign callback methods
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect

client.tls_set(ca_certs='../ca.crt', tls_version=ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED)

# set username password
client.username_pw_set(ClientConfig.Username, password=ClientConfig.Password)

# Connect to MQTT Broker
client.connect(ClientConfig.MQTTServer, ClientConfig.TSLServerPort, keepalive=60)

loop = asyncio.get_event_loop()
loop.create_task(publish_message(mqtt_client=client, topic_prefix=cfg.topic_prefix(), qos=cfg.QoS, retain=cfg.Retain))

if not loop.is_running():
loop.run_forever()

MQTT Publisher Native Service with C++ Implementation

TODO

Smart City — Client (Subscriber)

Before going forward, please look at the jupyter notebook which I used to understand subscriber properties.

Python Script

Subscriber implementation is similar to publisher, expect determining the topic.

I just updated the Config class to have different fields, a more comprehensive Config is required to easily manage variables, and avoid hard coded values into code, it is really really bad practice.

However, I will update the code time to time, just want to create a proof of concept approach, so this is the same Config.

class ClientConfig():
'''
Client Connection Configurations
'''
SystemName = "hypo_city"

def
GetClientId(self):
return 'TemperatureServer'

#systename/cityname/streetname/sensortype/data
subs_topic = SystemName + '/+/+/temperature/+'

def
GetSubscribedTopic(self):
return subscribe_topic

# define the server IP and port to be used
MQTTServer = "127.0.0.1"
TSLServerPort = 8883
ServerPort = 1883

Username = 'subscriber1'
Password = 'simple_password'

QoS = 0
Retain = False

def
parse_config_file(self, file_name=''):
"""
A config file can be used to parse data

:param file_name:
:return:
"""
#
TODO:
return True

Now, again callback methods need to be implemented to let Paho library do their job.

# Define callback functions for Subscriber
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))


def on_disconnect(client, userdata, rc):
print("Connected with result code " + str(rc))

I have one less and one more callback function for subscriber. There is no on_publish, instead there is on_message callback method to be implemented since this client receives messages and this allows us to determine what to do!

I want to do a really complex thing, want to print on console.

def on_message(client, userdata, msg):
message = msg.payload.decode("utf-8")

message_data = message.split('\t')
topic_data = msg.topic.split('/')

write_message(topic_data, message_data)


def write_message(topic=list(), message=list()):
if len(topic) > 0 and len(message > 0):
timestamp_utc = message[0]
data = message[1]
print('System: {}\tCity: {}\tStreet: {}\tSensorType: {}\tDataType: {}\tSensorData: {}\tTimeStamp: {}'
.format(topic[0],
topic[1],
topic[2],
topic[3],
topic[4],
data,
timestamp_utc))

That’s it actually, I also defined same main function here to start subscription.

if __name__ == '__main__':
cfg = ClientConfig()

if not cfg.parse_config_file():
exit(-1)

# Create Client Object and assign publisher callback methods
client = mqtt.Client(client_id=cfg.GetClientId())

# assign callback methods
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

client.tls_set(ca_certs='../ca.crt', tls_version=ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED)

# set username password
client.username_pw_set(ClientConfig.Username, password=ClientConfig.Password)

# Connect to MQTT Broker
client.connect(ClientConfig.MQTTServer, ClientConfig.TSLServerPort, keepalive=60)

# I will get, all cities and all cities streets temperature

subscribe_topic = cfg.GetSubscribedTopic() # system_name + '/+/+/+/+'

client.subscribe(subscribe_topic, qos=0)

client.loop_forever()

MQTT Subscriber Native Service with C++ Implementation

TODO

Database (MariaDB) Configurations

TODO

--

--