Connecting Node-Red and MQTT Broker
MQTT data broker with Kafka and NodeRed.Originally published at jugsi.blogspot.com.
n this simple exercise we want to connect Node-Red edge device to a local MQTT broker. Here the steps
- Install Node-Red
- Install MQTT broker (mosquitto)
- Build your first flow
- Install Node CPU and update the flow
- Connect MQTT broker
- Subscribe to topic
Installing Node-Red
sudo npm install -g --unsafe-perm node-red
the we can start node-red and point your browser to http://127.0.0.1:1880/
node-red
Installing Mosquitto
To install and start Mosquitto follow the link https://mosquitto.org/download/
for Macos
brew install mosquitto
brew services start mosquitto
Testing Mosquitto
Start the subscriber
mosquitto_sub -t topic/#
the try to publish something
mosquitto_pub -t topic/state -m "Hello World"
Start our first flow
Build the following flow:
On node timestamp set an interval of 1 sec. On node function write the following code:
// Create a Date object from the payload
var date = new Date(msg.payload);
// Change the payload to be a formatted Date string
msg.payload = date.toString();
// Return the message so it can be sent on
return msg;
At the end you will get an amazing message about the current timestamp.
Click on deploy.
Installing CPU node
Install the cpu “node contrib”:
cd ~/.node-red
npm install node-red-contrib-cpu
and restart node-red.
Update the flow
Update the previous flow as follow and add MQTT node
Connect MQTT broker
for the two MQTT nodes (violet):
- server: localhost
- port: 1883
- topic: topic/state.cpu or topic/state.ping
Deploy the flow!
Subscribe the two topics
Start the subscriber
mosquitto_sub -t topic/#
and here our results
Configure Kafka to connect MQTT
Now we want to connect Kafka to MQTT broker.
Install Kafka
From https://kafka.apache.org/downloads download and unzip kafka, then set the $KAFKA_HOME environment.
Configure Kafka-MQTT Connector
From https://github.com/evokly/kafka-connect-mqtt
git clone https://github.com/evokly/kafka-connect-mqtt
cd kafka-connect-mqtt
./gradlew clean jarcp build/libs/kafka-connect-mqtt-*.jar $KAFKA_HOME/libs/
cp build/output/lib/org.eclipse.paho.client.mqttv3-1.0.2.jar $KAFKA_HOME/libs/
Configure mqtt.properties
In $KAFKA_HOME/config create the following mqtt.properties file
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
kafka.topic=mqtt
mqtt.client_id=mqtt-kafka-123456789
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://localhost:1883
mqtt.topic=topic/#
we assume that MQTT server broker is running locally and the kafka topic is called mqtt.
Start
Start Kafka and Zookeper
./zookeeper-server-start.sh ../config/zookeeper.properties &
./kafka-server-start.sh ../config/server.properties
Create the topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mqtt
Start the connector
./connect-standalone.sh ../config/connect-standalone.properties ../config/mqtt.properties
Start the subscriber
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt --from-beginning