MQTT With PYTHON — Part 3

Ashiq KS
7 min readDec 1, 2018

--

A series on the lightweight protocol MQTT with the Python programming language.

This part is the continuation and last of the series on MQTT with python. The earlier posts are Part 1 and Part 2 where we discussed on MQTT and its usage with Python language. All the code files are available on this GitHub repository.

In this part, we will see how to publish values read from dummy sensors and subscribe using Python. Instead of using real-world sensors we will read data from a CSV file to take the complexity out of this post.

We will publish our data and subscribe to the data using Google cloud.

In this post, we will create some dummy data from a slurry field. The data are from the imaginary level sensor and methane gas detector. The level sensor finds the distance between the sensor and the base. The methane gas detector detects the percentage of methane gas over the atmosphere the detector is present.

Let’s start off with creating a configuration file, slurry_config.py, which creates our topics needed for both publishing and subscribing.

#slurry_config.pyslurry_name = "slurry001"
topic_format = "slurry/{}/{}"
level_topic = topic_format.format(slurry_name, "level")
methane_topic = topic_format.format(slurry_name, "methane")

We created a slurry field name, slurry_name, on which we create the topic names for the level sensor and gas detector to publish values.

Now we create another code file, slurry_sensors_emulator.py to publish data from the sensors, in our case, it is from the CSV file, slurry_data.csv.

#slurry_sensors_emulator.pyfrom config import *
import paho.mqtt.client as mqtt
import time
import csv
def on_connect(client, userdata, flags, rc):
print("Result from connect: {}".format(mqtt.connack_string(rc)))
# Check whether the result form connect is the CONNACK_ACCEPTED connack code
if rc != mqtt.CONNACK_ACCEPTED:
raise IOError("Couldn't establish a connection with the MQTT server")
def publish_value(client, topic, value):
result = client.publish(topic=topic, payload=value, qos=2)
return result
if __name__ == "__main__":
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.connect(host="broker.hivemq.com", port=1883)
client.loop_start()
print_message = "{}: {}"

while True:
with open('slurry_data.csv') as csvfile:
reader=csv.reader(csvfile)
for row in reader:
level_value = float(row[0])
methane_value = float(row[1])
print(print_message.format(level_topic, level_value))
print(print_message.format(methane_topic, methane_value))
publish_value(client, level_topic, level_value)
publish_value(client, methane_topic, methane_value)
time.sleep(1)

client.disconnect()
client.loop_stop()

We started off with importing necessary modules and files. Note that we have imported the ‘csv’ package to handle CSV files.

Then, we define the on_connect callback function which needs to be called when this client is connected to the server/broker.

def on_connect(client, userdata, flags, rc):
print("Result from connect: {}".format(mqtt.connack_string(rc)))
# Check whether the result form connect is the CONNACK_ACCEPTED connack code
if rc != mqtt.CONNACK_ACCEPTED:
raise IOError("Couldn't establish a connection with the MQTT server")

In this function, we also check if the connection request has been properly served, if not we raise an IOError.

Next, we create a function to enable sensors to publish data.

def publish_value(client, topic, value):
result = client.publish(topic=topic, payload=value, qos=2)
return result

This function takes in the current client, client, the topic to which the data are published, topic, and the value, value, that is to be published.

The client.publish function takes in the topic, the actual message, payload, and the quality of service parameter, qos, as parameters.

if __name__ == "__main__":
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.connect(host="broker.hivemq.com", port=1883)
client.loop_start()
print_message = "{}: {}"

Then we created the main function, created an MQTT client, passed on our on_connect callback function to client.on_connect method.

We connected to the Hive broker using the port 1883. Then we started the loop for publishing the data.

We created a placeholder for print the messages published as print_message = "{}: {}"

while True:
with open('slurry_data.csv') as csvfile:
reader=csv.reader(csvfile)
for row in reader:
level_value = float(row[0])
methane_value = float(row[1])
print(print_message.format(level_topic, level_value))
print(print_message.format(methane_topic, methane_value))
publish_value(client, level_topic, level_value)
publish_value(client, methane_topic, methane_value)
time.sleep(1)

After starting while loop, we read the CSV data off using the csv.reader function. As it reads one row for an iteration we have to assign the values to correct labels.

Then we print the values we read from the CSV data and publish the data to the correct topics using our publish_value function.

We sleep for a second because we publish data only in the interval of one second.

After publishing all the data we disconnect from the connection using the client.disconnect method and stoop the loop by client.loop_stop method.

Now we will get into our subscriber client.

Create a file called slurry_monitor.py as follows:

#slurry_monitor.pyfrom slurry_config import *
import paho.mqtt.client as mqtt
import time
import json
mqtt_host = "broker.hivemq.com"class Slurry:
active_instance = None

def __init__(self, level, methane_value):
self.level = level
self.methane_value = methane_value
Slurry.active_instance = self
def on_connect_mosquitto(client, userdata, flags, rc):
print("Result from Mosquitto connect: {}".format(
mqtt.connack_string(rc)))
# Check whether the result form connect is the CONNACK_ACCEPTED connack code
if rc == mqtt.CONNACK_ACCEPTED:
# Subscribe to a topic filter that provides all the sensors
sensors_topic_filter = topic_format.format(slurry_name, "+")
client.subscribe(sensors_topic_filter)
def on_subscribe_mosquitto(client, userdata, mid, granted_qos):
print("I've subscribed")
def print_received_message_mosquitto(msg):
print("Message received. Topic: {}. Payload: {}".format(msg.topic, str(msg.payload)))
def on_level_message_mosquitto(client, userdata, msg):
print_received_message_mosquitto(msg)
Slurry.active_instance.level = float(msg.payload)
def on_methane_message_mosquitto(client, userdata, msg):
print_received_message_mosquitto(msg)
Slurry.active_instance.methane_value = float(msg.payload)
if __name__ == "__main__":
slurry = Slurry(level=0, methane_value=0)
mosquitto_client = mqtt.Client(protocol=mqtt.MQTTv311)
mosquitto_client.on_connect = on_connect_mosquitto
mosquitto_client.on_subscribe = on_subscribe_mosquitto
mosquitto_client.message_callback_add(level_topic, on_level_message_mosquitto)
mosquitto_client.message_callback_add(methane_topic, on_methane_message_mosquitto)
mosquitto_client.connect(host=mqtt_host, port=1883)
mosquitto_client.loop_forever()

We import the necessary modules and packages, then assign the broker

pubnub_mqtt_server_host = "mqtt.pndsn.com"
mqtt_host = "broker.hivemq.com"
_id = slurry_name
pubnub_topic = slurry_name

We use ‘mqtt.pndsn.com’ as the Pubnub server/broker and ‘broker .hivemq.com’ as the MQTT server/broker.

Then we create a Slurry class to represent the data as follows:

class Slurry:
active_instance = None

def __init__(self, level, methane_value):
self.level = level
self.methane_value = methane_value
Slurry.active_instance = self

In the constructor, we created variables for level and methane values

and pass in the values at the time of instantiation.

We have a class variable active_instance set to None. It is to represent the current instance to be used on callback functions.

Then we assign the current instance self to the class variable Slurry.active_instance.

Now, as usual, we will create our on_subscribe_mosquitto callback function to be called upon connection with the server/broker.

def on_connect_mosquitto(client, userdata, flags, rc):
print("Result from Mosquitto connect: {}".format(
mqtt.connack_string(rc)))
# Check whether the result form connect is the CONNACK_ACCEPTED connack code
if rc == mqtt.CONNACK_ACCEPTED:
# Subscribe to a topic filter that provides all the sensors
sensors_topic_filter = topic_format.format(slurry_name, "+")
client.subscribe(sensors_topic_filter)

And then subscribe to the topics we are interested in. Interestingly our effective topic has become ‘slurry/slurry001/+’. You might what this ‘+’ means, instead our topic names are ‘slurry/slurry001/level’ and ‘slurry/slurry001/methane’. What exactly ‘+’ here does is it acts as a placeholder or variable which means whatever the name that comes at the + position becomes a new topic to which our client will be subscribed to.

For eg, say we have subscribed to the topic ‘‘slurry/slurry001/+’, then if our server/broker receives a message from ‘slurry/slurry001/level’, then we are subscribed to that topic too, but not to ‘slurry/slurry002/level’ or slurry/slurry003/level’, because we haven’t subscribed to ‘slurry/slurry002/+’ and ‘slurry/slurry003/+’. Here the ‘+’ is a wildcard in the convention of naming a topic.

The ‘+’ is helpful to us when we need to subscribe to a large number of topics having the same prefixes or suffixes.

Then we create on_subscribe_mosquitto callback function upon mosquitto subscribing to the topics and we have a helper template function, print_message to print the messages into the command prompt.

def on_subscribe_mosquitto(client, userdata, mid, granted_qos):
print("I've subscribed")
def print_message(msg):
print("Message received. Topic: {}. Payload: {}".format(msg.topic, str(msg.payload)))

Now we have new callback function to serve upon receiving messages to the respective functions.

def on_level_message_mosquitto(client, userdata, msg):
print_message(msg)
Slurry.active_instance.level = float(msg.payload)
def on_methane_message_mosquitto(client, userdata, msg):
print_message(msg)
Slurry.active_instance.methane_value = float(msg.payload)

These functions print their respective messages and save to instance’s variables using the Slurry class’ variable active_instance. We will see how these functions are called later.

if __name__ == "__main__":
slurry = Slurry(level=0, methane_value=0)
mosquitto_client = mqtt.Client(protocol=mqtt.MQTTv311)
mosquitto_client.on_connect = on_connect_mosquitto
mosquitto_client.on_subscribe = on_subscribe_mosquitto
mosquitto_client.message_callback_add(level_topic, on_level_message_mosquitto)
mosquitto_client.message_callback_add(methane_topic, on_methane_message_mosquitto)
mosquitto_client.connect(host=mqtt_host, port=1883)
mosquitto_client.loop_forever()

We instantiate a new Slurry object slurry.

Now we will create a mosquitto client and assign the callback we created.

The message_callback_add method takes in two parameters, the topic name and callback function to be executed upon receiving a message on the passed in topic name.

We create the connection and start the loop thread for the mosquitto client too.

Now we will test our programs. First of all, we will subscribe by running slurry_monitor.py. Following is the output from the command prompt.

Let’s publish the data from our CSV file by running slurry_sensors_emulator.py. We can see the data are being published to the mosquitto server.

Switching to our client, we can see our client starts receiving messages from the publisher as follows.

That’s it. With few lines of code, we have set up a demo of publisher and subscriber in MQTT. This is the last part of the MQTT series. Part 1 and Part 2 are available predecessors.

Please feel free to comment below to let me know your suggestions.

--

--