Create a MQTT server using Functional Programming paradigm

We have severals libraries to help us with routes and redirect the request for the right function (aka controller) in http servers, but and about MQTT?

In MQTT we receive an topic with payload and we should be able to figure out whats this request is about and handle this request with the right function.

server.on('published', (packet, client, cb) => {
// Whats to do with the package ?
});

The question that I will try to answer is how handle MQTT request with functional programming paradigm.

The processor

The first step is create a module that process the request ..

server.on('published', (packet) => {
processor(packet);
});

The processor will figure out what the packet is about and redirect to the right handler.

// processor.js
export default function processor(packet, client) {
const resolve = R.cond([
[COND, EXEC]
]);
resolve(packet);
}

The idea is simple, COND is a function that return true or false , if truethe EXEC function will be executed. See more about R.cond in Ramda docs.

resolve is a function that will return whatever EXEC return

So, for example ..

const paket = {
topic: 'world'
}
const resolve = R.cond([
[() => true, (p) => 'hello ' + p.topic]
]);
resolve(packet); // hello world

As you can see, is a array of arrays, so we can have multiples conditions..

const resolve = R.cond([
[() => true, (p) => 'hello ' + p.topic],
[OTHER_FN, OTHER_EXEC],
[ONE_MORE_FN, ONE_MORE_EXEC],
// ...
]);

The request

Using this approach we can handle request very easier, all we need to do is create javascript modules as follow..

// processRequestExample.js
export function execute(packet) {
// Process packet and return a valida packet or null
}
export function match(packet) {
// Return true if execute should be execute
}
export default [match, execute];

And import this module on processor.js

import processRequestExample from './processRequestExample'
const resolve = R.cond([
processRequestExample
]);

The publish

For last but not less import, if our resolve return an packet we need to publish the response. Let's change our processor.js ..

export default function processor(packet, publish) {
const resolve = R.cond([
processRequestExample,
]);
  const publishIfConditionalResolved = R.pipe(
resolve,
R.when(R.identity, publish),
);
  publishIfConditionalResolved(packet);
}
R.when resolve the second function if the first one return true.
In this case, if resolve return a value, publish will be called with this value.

And our server,

server.on('published', (packet, client, cb) => {
const onPublish = (packetResp) => {
server.publish(packetResp, client, cb);
};
processor(packet, onPublish);
});

This makes easier handle MQTT request and answer this request if necessary.

More real situation

Let’s create an sum calculator, will be able to sum two numbers from topic,

  • When receive a packet from topic calc/{num1},{num2}
  • With the payload { "command": "sum" }
  • Should return {result} with sum of {num1} + {num2}

#1 Add handle to our processor

import sumCalc from './sumCalc';
export default function processor(packet, publish) {
const resolve = R.cond([
sumCalc
]);
  // ...
}

#2 Implement sumCalc

We have to figure out if the request should be resolve by our module match and if so we need extract and sum the numbers from the topic.

match function

The match function job is answer true if the packet should be resolve by this module or false if not.

To do that, we need validate the topic and payload

  • To validate the topic, we can use an simple regex
  • To validate the payload we can use ajv that validate the payload against JSON schema.

I will comment the code to make easier to understand

// Topic regex validator
const matchChannel = /^calc\/(\d+)\,(\d+)/im;
// JSON Schema validator
const ajv = new Ajv();
const payloadRequestValidator = ajv.compile({
type: 'object',
properties: {
command: {
type: 'string',
pattern: '^sum$',
},
},
required: ['command'],
});
export function match() {
// Validate topic against regex
const topicLens = R.lensProp('topic');
const isTopicValid = R.pipe(R.view(topicLens), R.test(matchChannel));
  // Get JSON
const payloadLeans = R.lensProp('payload');
const payloadView = R.view(payloadLeans);
const utf8String = prop => prop.toString('utf-8');
const getPayload = R.pipe(R.over(payloadLeans, utf8String), payloadView, JSON.parse);
  // Check JSON schema
const isPayloadValid = R.pipe(R.tryCatch(getPayload, R.F), payloadRequestValidator);
  // If is a valid schema and valid topic, return true
return R.both(isPayloadValid, isTopicValid);
}
A lot of Ramda commands here, please check the docs for better understand :) ramdajs.com/docs

execute function

Now we need implement the execute function that should

  1. Extract n1 and n2 from topic
  2. Sum numbers
  3. Return a packet with result
export function execute() {
// Used to extract numbers
const matchNumbers = R.match(matchChannel);
const topicProp = R.lensProp('topic');
const viewTopic = R.view(topicProp);
const transformToBuffer = obj => new Buffer.from(JSON.stringify(obj));
  // Convert property n1 and n2 to int
const transformations = { n1: parseInt, n2: parseInt };
const extractAndSum = R.pipe( // #1
viewTopic,
matchNumbers,
R.zipObj(['all', 'n1', 'n2']),
R.pick(['n1', 'n2']),
R.evolve(transformations),
R.values,
R.sum,
transformToBuffer
);
  const payloadProp = R.lensProp('payload');
const setPayload = R.set(payloadProp);
return R.chain(setPayload, extractAndSum);
}

If you pay attention on #1 we ..

  1. Get the topic value in viewTopic
  2. Extract numbers using regex in matchNumbers
  3. Regex return an array like ['calc/2,3', '2', '5'] , so we convert to a object {all: 'calc/2,3', n1: '2', n2: '5'} in zipObj
  4. Return a object with n1 and n2 only in pick
  5. Transform properties n1 and n2 using transformations in evolve
  6. Return array with the values only, {n1: 2, n2: 5} will be [2,5] in values
  7. Sum the array in sum
  8. Convert to buffer result in transformToBuffer

Export default

You should be notice that match and execute don't have any params, the reason is because we don't need, we can export with R.call

export default [R.call(match), R.call(execute)];

You can find the full code on my github.

Feel free to ask me any question. :)


Like this? Please click ❤️ bellow so other people can find it.