NoSQL2MQTT, a Coreflux Flux Asset

Paulo Mota
Coreflux Blog
Published in
6 min readJan 28, 2022

1. The Purpose of this Flux Asset

Coreflux’s NoSQL2MQTT, as the name may imply, is an asset focused on linking a NoSQL (non-relational) database to your MQTT pipeline.

Its primary purpose is to register a set of intended topics from a given MQTT Broker into the asset’s NoSQL MongoDB-like database. These values can then be consulted at any time, by conducting queries on the database according to the intended parameters. The resulting data is delivered in the same JSON format, being then available for processing using tools or a language of your own choosing, like C#, Python, among many others.

This asset can also be used as a backup of specific topics or the entirety of an MQTT Broker, as it can be configured to re-write all last saved values on start-up.

2. The Target Protocol

In this case we are dealing with a storage/retrieval mechanism, instead of a specific communication protocol.

By definition, a NoSQL (“non-SQL” or “non-relational”) database provides a mechanism for storage and retrieval of data that differs from the tabular relations used in relational databases. This is perfect for the tree-like structure of MQTT, and the string / JSON format in which its messages are transported.

3. Installation

  • There must be an MQTT Broker available (whether Coreflux’s MQTT Broker / Coreflux MQTT WebSockets Broker Flux Asset), to which the values will be written.
  • To install this Flux Asset, open the Coreflux interface, and locate the NoSQL2MQTT asset (check if you have available slots for the installation, this information is displayed on the asset box, in the “used slots / total available slots” format)
  • Click the “Install” button, which will prompt the opening of the configuration overlay
  • Configure the NoSQL2MQTT Flux Asset to be installed (retrieve the settings for the target MQTT Broker)
  • Start the newly installed NoSQL2MQTT Flux Asset
  • After installation, the instance window for this asset installation will display the “Standby” status
  • Before the asset is actually usable, you will need to configure the topics you wish to backup. On Coreflux versions below 1.2.0, this will be done through the asset’s XML file. Navigate to Coreflux’s installation folder, and locate the appropriate asset folder within the “instances” folder.
  • Open it. The XML section to alter is the following (on section 5 of this article, below, you will find an extensive explanation of how to carry out this configuration properly).
  • Once configured, save the file and return to the Coreflux Hub window. To begin execution of the NoSQL2MQTT Flux Asset, press the Play button, and if successful, the instance status will change to “Running”.
  • If for some reason the inputted configuration fails to execute (or permission to run is not confirmed by the system), the asset will try to enter a run state but return to a halted state when failing to do so, displaying an error popup (on the GUI Hub) or message (on the command-line Hub)

4. Glossary of Parameters

  • Instance ID (String)

Name identifier attributed to this specific instance of the NoSQL2MQTT Flux Asset application (which can either be defined by the user or randomly assigned if null, at the time of creation of that specific instance)

  • MQTT Broker Server IP / Address (IP Address)

IP element of the address where the gathered and generated values should be written (if you have a local MQTT Broker Flux Asset installed and bound to any address, you can either use your specific network IP or localhost address)

  • MQTT Broker Server TCP Port (Int)

Port element of the address where the gathered and generated values should be written (if you have a local MQTT Broker Flux Asset installed, you can either use it, inputting the specific port in which it is running)

  • MQTT Topic for Queries (String)

The MQTT topic on which your query requests should be publish. The NoSQL2MQTT Flux Asset will monitor this topic, and attempt to conduct a search using the given parameters.

  • MQTT Topic for Results (String)

If the query conducted on the above query topic returns valid results, this is the topic on which the resulting response should be published, for you to further consume the data.

5. Configuring the Link Arrays

To store topics on the database, at least one of these elements must be created on the configuration file, under the “links” configuration. This element can change the behavior of how a certain topic is saved to the database, as a 1-to-1 or as an event registry of changes. The properties of the link object are described on the following paragraphs:

5.1. PARAMETERS:

  • mqttTopic

The MQTT topic to serve as the data source. This address has 2 wildcards that let this service identify what should be used as the target collection name and how to identify the documents to be written (optional):
— collection: topic path wildcard to be matched with DB collection.
— docId: topic path wildcard to be matched with BD document ID inside the provided collection.

  • restoreOnBootup

Can have value: “true” or “false”. If true and “docId” wildcard is used the service will try to restore data from DB to the MQTT server target topic on startup (after it restarts).

  • collection

An array of collections used on “collection” wildcard and creates a collection on DB with the same name for each provided name.

  • docId

An array of “docId” to replace the wildcard and match with the document ID on the database. This is an optional configuration as it is not needed if the desired behavior is to save all events from the topic.

All records on the database will have these two properties available:

createdAt: the timestamp when the record was created

updatedAt: the timestamp when the record was last updated

5.2. EXAMPLES:

If you want to register all objects published under a topic…

The following will create two collections, namely object1 and object2

If you want to write over the same document with the given ID…

The following will create one collection object3 with two documents, namely doc1 and doc2.

Also, on startup of the service this documents will be read from the database and published under same topics because the setting restoreOnBootup is set to true.

6. Conducting a Query

The service supports data queries, to retrieve data stored on the database using the same structure as we would on a LiteDB project.

In this chapters you can find some queries for desired scenarios (JSON objects are formatted, but they can be sent as one compressed line).

6.1. SUPPORTED OPERATORS:

In order to allow for a more complex filtering of information, the service allows the use of multiple operators. Currently, the following ones are supported:

  • and

Joins two conditions that both must resolve to true, its value is an array of BSON object.

  • or

Joins two conditions that one of them must resolve to true, its value is an array of a BSON object.

  • lte

Field to compare must be lower or equal than value. Its an object with two properties: field and value.

  • gte

Field to compare must be greater or equal than value. Its an object with two properties: field and value.

  • lt

Field to compare must be lower than value. Its an object with two properties: field and value.

  • gt

Field to compare must be greater than value. Its an object with two properties: field and value.

6.2. EXAMPLES:

Getting all records from collection “JSONEvents” from date and time “2019–07–26 15:20:30”:

Getting all records from collection “JSONEvents” for day “2019–07–26” of “typeID” equal to 2:

--

--

Paulo Mota
Coreflux Blog

Cognition and behaviour in increasingly technological societies