Close the hatch, it’s going to rain!

Flock. Community Blogs
Flock. Community
Published in
16 min readAug 17, 2021

By Julius van Dis & Niels Simonides

Introduction

Mid 2020 Flock. received the keys to their new office space on the top floor of an old building in Utrecht city center. The office comes with a shared rooftop terrace with a hatch that is susceptible to rain getting inside and subsequent water damage. We are software engineers and pay more attention to our code than to weather changes, we thought: “Wouldn’t it be great to receive a notification in advance whenever it would start raining and the hatch is still open?”.

We created office monitoring, a platform capable of just that: rule-based alerting with input from sensor measurements combined with external factors, like weather changes. To achieve this goal, sub-goals were created:

  1. Create insights on the state of the office, for example: which doors and windows are open, what is the temperature/humidity, has there been any movement lately?
  2. Create a dashboard that shows the latest state of the office.
  3. Work with -to us- unknown technology or frameworks and learn something new.
  4. Provide Flock. with a platform that is not only useful but also extensible for other use cases that have something to do with “monitoring the office”.

Because the possibilities of the goals above are virtually limitless and to scope the first version of our product we decided to limit our MVP version to our initial thought: send a notification when rain is imminent and the hatch is still open.

The rest of this blog is structured as follows: It starts with a high-level overview of the solution and an explanation of the technical choices. Subsequently, we describe the components needed to go from sensor measurement to a front end. In addition, we discuss how alerting was set up, and finally, we provide some conclusions and thoughts about further work.

Architectural overview / what we built

Our main use case is sending an alert when rain is imminent and when the hatch is still open but our vision was to build a more generic platform where we can connect all kinds of sensors and other events which have something to do with the office.

In the diagram below you’ll see a high-level overview of the whole architecture:

It all starts in the office, where we’ve placed sensors in different spots. These sensors are sending “state updates” which are received by the Zigbee Gateway which is plugged in the Raspberry Pi. From here all events are published in a GCP Pub/Sub topic to which our back end is subscribed. The back end processes all the state events and saves them to a GCP Data Store. When a front end is connected, the back end also pushes state updates to it. Furthermore, the back end evaluates the state events against a rule-based alerting component and invokes a Signal messaging API to send out notifications, which can warn us when something happens.

Technical choices

After we knew what we, on a high level, wanted to build we could start selecting technologies to make this happen. Here we chose a mix of “boring” technologies we already had experience with (Spring Boot, Kotlin, Google Cloud Platform) and some “intriguing” which we were less familiar with to broaden our knowledge and learn something new. The latter are elucidated below:

RSocket for client/server communication

To ensure the front end can show the device updates near real-time, we decided to use RSocket, an application protocol providing Reactive Streams semantics.

The reactive paradigm is already common in a lot of frameworks and libraries, such as RxJs, Reactor or Akka streams. Only usually this ends at the boundary of an application and then resumes at the boundary of the next application. RSocket bridges this gap, because of its architecture to support streaming semantics over a network layer.

A typical client-server setup runs the RSocket protocol on top of a WebSocket connection, allowing for full-duplex communication. In the reactive world, the subscriber is always in control of how many elements it can receive, something that is not possible with alternatives like SSR (server-side event), or just plain WebSockets

Although the support of backpressure is complete overkill in our use case, we still wanted to know more about it from an educational perspective. Want to know more? Check out this talk by Oleh Dokuka

Svelte as front end framework

Again from a learning standpoint, we chose Svelte as the framework to build the front end with. Paraphrased from their website:

“Svelte is a radical new approach to building user interfaces. Svelte shifts work that typically would happen in your browser (by React, Angular, etc.) into a compile step that happens when you build your app. It converts your application into pure vanilla JavaScript, just like Babel converts ES2016+ to ES5. You write your components using HTML, CSS and JavaScript (plus a few extra bits), and during your build process Svelte compiles them into tiny standalone JavaScript modules.”

Reading this sparked our interest and we examined it further. It turned out that besides a new approach to front end applications, it also contained a concept called stores for handling state in a reactive manner which seemed to fit very well in our sensor events use case and it also looked much more simple than alternatives like Redux.

Kotlin Coroutine flows for reactive streams in the back end

Some of Kotlin Coroutines components were already familiar territory but Flow was not one of them. Flow is an implementation of reactive streams specification, built into Kotlin’s Coroutines library. Because of the reactive nature of a Flow and the fact that it’s part of the Kotlin ecosystem where we already had a great experience, we also decided to give it a try.

The magical journey of an event

Let’s dive into a single measurement of a device — a contact sensor — and follow that state update, from the physical moment contact is broken, to the front end showing the new state of the device, and our rule-based alerting engine validating whether a text message needs to be sent.

The devices

To measure the change you need a sensor, one that is capable of transferring any measurement to a gateway such that it ultimately can be stored in a persistence layer. Some of the common protocols used in IoT and home automation are Z-Wave, Zigbee, Wifi, Bluetooth Low Energy, and Lora. We chose to go with a single protocol to start with and chose Zigbee.

To be able to check whether the rooftop hatch would be open or closed a contact sensor from Aqara was the perfect fit. To be able to demonstrate the ease of adding additional devices to the platform, we decided to also buy a temperature sensor to measure the temperature of our refreshments in the refrigerator. As well as some smart sockets, to be able to remotely turn on/off devices in the office. Our focus however was on the contact sensor, because knowing the state of the hatch is vital.

The raspberry

With our devices installed, we needed to get their state updates into the cloud, where our back end is running. To get there, we placed a Raspberry PI 3B in the office, on which we mounted a Zigbee gateway module from Phoscon. This module takes care of pairing with new devices, ensures the cryptographic operations when sending messages and takes care of the housekeeping parts of the protocol.

Next up, we got Zigbee2MQTT, a great piece of software that converts Zigbee messages to MQTT ones, a more readable and commonly used standard in home automation.

Our MQTT messages are forwarded to an MQTT broker, which decouples clients from being directly connected to one another, and can also manage to broadcast messages to multiple clients if needed. We’d hoped to use the MQTT bridge available in Google Cloud, which we, unfortunately, did not get working after 2 days of experimenting.

Instead, we took our loss and decided to run an MQTT broker locally on the PI, and we wrote a small Kotlin app, mqtt-2-queue-connector. This application would subscribe itself to the MQTT-broker and replicate all messages, as is, to a Pub/Sub topic in GCP. Leaving the actual connection logic up to spring-integration-mqtt for the mqtt-client connection and spring-cloud-gcp-starter-pubsub for the connection with Pub/Sub, the actual code of this app could be kept to an absolute minimum.

We started with our Zigbee contact sensor breaking contact and sending a Zigbee message to the Zigbee gateway on the Raspberry Pi. This is converted into an MQTT message, and published to an MQTT broker. Then, our mqtt-2-queue-connector app receives the state updates from the broker and replicates the message to PubSub. At that point, our back end, listening to PubSub, is notified.

The back end

The back end is a Kotlin Spring Boot application. It consists of 2 parts: event ingestion and serving data for the front end. As a boundary between those 2 parts, there is a DeviceStateEventBus and a Datastore, more on that later. Let’s kick off with the event ingestion part.

Event ingestion

As seen in the previous chapter, the MQTT connector pushes state updates to a GCP Pub/Sub topic from where the back end can pick them up. We make use of the spring-cloud-gcp-starter-pubsub dependency to integrate with Pub/Sub. For this, we’ve implemented a straightforward EventTopicSubscriberService.

It has the simple task of forwarding all the state updates to the DeviceStateService and acknowledging, or in case of an exception negative acknowledging the Pub/Sub messages.

The DeviceStateSaveService is responsible for mapping the state update to an internal model, saving it to the GCP Datastore and informing the rest of the application of a new state update. The latter happens by publishing it to an internal event bus.

The goal of saving the state updates in the Datastore is to be able to provide historical data. Since a state update can be seen as a “historical fact” it can just be appended to the collection of events, hence our choice for a document-oriented database. The goal of the DeviceStateEventBus is to provide a live feed to connected clients.

At this point, there is a Datastore with historical data and an EventBus with live data. Let’s move to the data serving part and see how this data can be retrieved.

Serving data to clients

When a client connects to the back end an RSocket connection is opened. This is handled by the spring-boot-starter-rsocket dependency. Because this dependency has Project Reactor support it’s possible to use a Flux for in- and outbound messages. However, with the kotlinx-coroutines-reactor dependency, it’s even possible to use a Kotlin Coroutines Flow:

It has one parameter Flow<FlockMonitorCommand> where all the commands from a specific client enter the application and it returns Flow<FlockMonitorMessage> which is the tunnel back to the client with the response of the requested data.

It’s important to realise that this method is called only once per “session” and that both the command and message flow live as long as the client is connected.

Each inbound command is mapped to the corresponding FlockMonitorCommandBody and then this Flow is passed to the SubscriptionHandler which processes it further:

The SubscriptionHandler’s goal is to keep track of requested commands sent by the client. We wanted to provide an idempotent experience so the clients don’t have to keep track of what they requested and always guarantee that messages are delivered once. This reduces the complexity in the front end.

As mentioned before both in- and outbound flows live for the whole session and they represent a single client. This means we can keep track of all requested commands inside the body of the Flow. This is great because it provides a single-threaded illusion, imagine all the hassle you would have if you would need to keep track of the subscriptions in a shared context. You would for example have to think about: thread safety, concurrent modification, blocking threads etc. Luckily the Flow solves all these problems for us.

The SubscriptionHandler keeps track of all the requested commands in the activeStreams variable. The key of this map is an identifier that is unique for each command type. By checking if the key is present, we can determine if the command was already requested. If this is the case we return emptyFlow(). Otherwise, we forward the command to the CommandDispatcher which returns a Flow with the requested messages. All these specific sub-flows, with only 1 type of data, are merged and the messages are emitted in the main flow, our tunnel back to the client.

The CommandDispatcher’s responsibility is to forward each command to a specific Executor.

These executors sit at the end of the serving part and fetch the specific data requested by the command and return it in a Flow. Here is the DevicesCommandExecutor:

The DeviceStatesCommandExecutor first fetches the requested history from the DataStore and subscribes to the live state updates for the requested deviceId:

The benefit of this architecture is that everything “above” the Executors is generic and the Executor’s themselves are specific for one data type. If we want to extend the application we’d simply have to specify a new Command and an Executor and that’s it!

The front end

The front end visualises what happened to the office in the last hours and displays the current state of the office. To do this, it needs to retrieve all events from the back end. The front end sets up an RSocket connection to the back end. All requests and responses, to and from the back end are sent through this connection:

The client on itself doesn’t do much. The RSocket-JS implementation uses an Rx-style inspired “Flowable” API to interact with the client.

To connect, we need to call the connect method and in the onComplete callback of the subscription, we need to provide a request channel, which is a Flowable which we can use to send commands to the back end. For the inbound messages from the back end, we need to create a response channel, which is also a Flowable.

At this point, there is a full-duplex connection to the back end and there are 2 Flowables which are the handles for in- and outbound messages.

RSocket allows for reactive streams over a network layer. The reactive streams specification describes that the subscriber must signal its demand via the Subscription.request(n) function. This tells the Publisher how many elements it needs to send:

The onSubscribe function is called first, here we request 100 elements from the back end. Each of the elements enters in the onNext function and is passed to the MessageSink. Afterwards, we request another element.

Note that you shouldn’t use this implementation in a real production application. After the first 100 elements (the event history in our case) this implementation doesn’t perform too well because after each element the publisher needs to wait again for a new request. This way an extra round trip is needed for each element. It would be better to keep a spacious pool of credits and replenish it only when it reaches below a certain threshold.

Until this point, we only discussed the front end’s inbound stream. On a conceptual level, the outbound stream works the same only here the front end is the Publisher and the back end is the Subscriber. In this direction the back end creates the Subscription and signals its demand to the front end and only sends an event when the back end requests one:

The Flowable created here represents the stream of commands to the back end. The back end’s demand is communicated by the subscriber.onSubscribe object. When the back end can process an element the request(n: number) function is called. In its implementation, we add the newly requested number to the already existing number. Correspondingly we also need to subtract one credit when we send a command to the back end:

At this point the bottom data layer and all RSocket protocol necessities are implemented, the front end can propagate and receive a demand to and from the back end.

Now let’s add a layer on top of the connection and add some business logic. The RSocket connection discussed before is managed by a global event bus. All messages to and from the back end pass through this object. The event bus buffers the commands until the connection is initialised:

The MessageHandler in the code above is just a simple mapper which maps the incoming state updates and stores them in a Svelte store. The front end components where we display data can access the EventBus through a singleton and use it to send a request for a given type of data to the back end. This looks like this for the “devices list” component:

The component simply requests a DevicesListSubscription command when it mounts in the browser. The command is sent through the RSocket connection to the back end. The response message is handled by the MessageHandler and stored in a Svelte store. The component binds to the device store and Svelte does the rest!

Rule-based Alerting

Now that we can propagate a state update from a contact sensor all the way to the front end in a browser, we have all the ingredients to start working on our primary goal: send out messages whenever the hatch is opened, and it’s going to rain.

To achieve this we need the following parts:

  • Rule evaluation: determine the current aggregated state of the system (hatch + weather forecast)
  • Alert evaluation: determine whether to send out messages
  • Sending alert: integrate with a Signal API to broadcast alerts.

Rule evaluation

We start with combining the latest sensor state with a weather forecast to create an aggregated state for our rule ‘Alert when the hatch is opened and it’s about to rain.

A piece of configuration determines the rule type, the contact sensors to keep track of, and some information about alert messages to be sent.

For each rule, an evaluator instance is created of its corresponding type, and it subscribes itself to the internal event bus for sensor updates, as well as for an additional one with weather forecast updates. The following method evaluates the rule on any change:

The scan function here is a function of Flow, Which is a reduce function with an initial value, and it emits every intermediate result as well. As the method subscribes to an indefinite flow of both state updates and weather updates, The method itself subscribes to an indefinite flow that returns an aggregate RainCheckSensorData, showing which sensors have been opened, and the earliest time it is going to rain.

To give a simple example of what scan does:

will produce: [], [1], [1, 2], [1, 2, 3]].

Alert evaluation

The aggregated state is used to determine if an alert needs to be sent out. A list of alerts can be configured, with a time-to-deadline.

Every time the weather forecast is updated (and different) or when the hatch is opened or closed the evaluateAlerts function is triggered. To ensure we don’t miss a deadline, every 5 minutes a timed update is triggered as well. Reasoning: it could very well be that a forecast that tells you it will start raining in 35 minutes. If you have an alert for 30 minutes set up, it won’t fire. Yet at 30 minutes you do want to send out that alert. Hence the timed updates, to ensure at least every 5 minutes the alerts are checked.

To ensure the correct alert is sent some conditions are checked. We want to send the alert with the lowest time to rain which we have not sent out yet. The method below does just that: figure out which alert (if any) should be sent.

Sending alerts

Whenever the time has come to send out an alert, we set up a Signal API integration. The implementation of that API is out of scope for this blog, but the source code can be found on Github. It provides us with a very trivial API to use, while it takes care of the complex cryptographic end-to-end encryption Signal is known for.

Conclusion

Our MVP version is running in our office, now we have to wait until it’s going to rain… One of the notable things we’ve learned by building this project is a deeper understanding of how backpressure works. We already knew that the subscriber is always in charge of how many elements it receives but because, on the front end side, we had to implement this ourselves you’ll better realize the inner workings compared to when you just use it. It was also just cool to see (by the logging statements) that the front end controls the message rate inside the back end.

A wow-moment was when we discovered that we could just store the state of a client, a map with active subscriptions, in a Kotlin Coroutines Flow. This single-threaded illusion made it so easy to implement without having to worry about concurrency.

The Store feature in Svelte made it easy to handle the state in our front end. In comparison with for example a React Redux stack we would have needed considerably more code to reach the same result. Our goal was not to build an enterprise-grade application so we didn’t dive into how to test the stores. We’ve declared the stores a singleton variable, this might not be sufficient when tests are added. We probably would need a dependency injection mechanism in that case.

After using React for a while, managing side effects and state with hooks becomes the status quo. When you start using Svelte you’re going to question this: “Why do I need a useEffect hook to perform side effects?” For us, Svelte made clear that everything you take for granted in React is not always the most simple way to do it. Using Svelte for this project opened a different angle to look at a front end project.

This project is far from finished. We’ve built a platform with only one use case, many more features can be added. Here are some of the other ideas we have and which we might implement in one of the following Flock. projects:

  • Temperature sensors on the central heating and the air conditioning: to send a notification when somebody forgets to shut it down during the nights or the weekends.
  • Moisture sensors in the soil of our plants; so they don’t die
  • Keep track of our fridge supply (especially the beer); with an old smartphone camera it’s possible to take regular photos of the stock, a machine learning model could be trained to count it. This way we should never be out of stock.
  • Currently, the platform is a one-way stream, there are also Zigbee devices that can receive commands. This would pave the way for more real automation: why send an alert if you also can close it with a robot arm or shut down the central heating with an automated radiator valve.

--

--

Flock. Community Blogs
Flock. Community

Flock.community where everyone has the drive to keep developing themselves. Where we get a little better every day.