Implementing a real-time detection algorithm with Lambda functions and DynamoDb streams (Detecting Paris’ locked bicycle stations 3/5)

Jean Baptiste Muscat
CodeX
Published in
10 min readSep 20, 2021
Photo by Andrew Gook on Unsplash

This series of articles is about me spending way too much time trying to solve a niche problem (detecting locked bicycle stations in Paris, see Part 1) while learning how to use the AWS Serverless stack. To find the other articles, skip to the bottom of the page.

Detecting locked stations

In Part 2, I created several Lambda functions and DynamoDb tables to fetch and store the content of each Velib station, each minute. I also computed the last time a station was active (i.e. a bike had been rented or returned from this station). With this data, I will try to detect locked stations.

A very simple algorithm would be to consider any station that has not been active for a given amount of time as locked. Station A has not seen any traffic for 3 hours, so it must be locked, right?

This begs the question: is the activity of the stations evenly distributed throughout the day, and are all stations equal?

Nope.

Box plot of the station’s activity per hour on a typical working day. For example, at 7 PM one of the stations reached 34 bikes rented or returned while the median for the 1.500 stations was only 2.5.
Most of the stations see between 20 and 40 bikes rented during a single day. But a few can reach 200 and above.

This was kind of expected: the traffic is totally different at 4 AM or at 4 PM. Even worse, on a single day, some stations can have a traffic of almost 400, while about 60 stations see less than 20 bikes rented or returned (and a handful of them have a daily average traffic close to 0).

With this level of disparity, a simple hard-coded threshold would not be effective. It would either be creating false positives if too low or not detect locked stations if too high.

In search of a better metric

If a unique time threshold is not effective, maybe I could try to compute a custom time threshold for each station, based on its past activity. But this seems complicated.

Let’s take a step back. Instead of focusing on the time since the last activity, why not focus on the number of bikes that should have been rented or returned during this time period? Put another way: station A is inactive for X minutes, and usually, 20 bikes should have been rented or returned during this time, so it may be locked.

We can still have a very basic threshold (20 bikes). But this threshold will be reached more quickly for the more “active” stations and will take more time for the less active ones. The disparity of activity amongst the station is built in this metric.

Computing the missing activity

It’s 2:34 PM and a station seems inactive since 1:07 PM. How can I know what level of activity should have occurred in those 87 minutes?

  • #1 Looking at yesterday’s data: I could load the content of the station for the same time frame yesterday and count the number of bikes rented or returned. But we are Monday, and yesterday was Sunday, so is Sunday's traffic really representative? And what if the station was already having trouble at the same time yesterday? That would return no traffic. Using a single day as a reference is not safe.
  • #2 Using the average rate: I could easily compute an average rate (number of bikes seen / minute) for each station, and just multiply that by 87 minutes. But, as we have seen earlier, the traffic varies quite a lot during the day (and the week) so this method is not accurate.
  • #3 Using the average rate on the same weekday and timeslot: Instead of loading yesterday’s content, I could look at the average traffic for the same weekday on the same time frame. This seems good, but the traffic at a given station can evolve with time. Maybe it’s more used in the summer, maybe a new mall opened right next to it. An average rate will take a lot of time to adjust.
  • #4 Using the rolling median rate on the same weekday and timeslot: Instead of using an average, I can use the median over the last four weeks. That way my expected traffic will quickly adjust to a new usage pattern on a station. This will also discard the more “exceptional” days and correctly represent the “true” expected traffic.

Optimization

Let’s go with solution #4.

I ingest the stations’ content each minute so I’ll perform a “locked station detection” every minute for each station that seems inactive.

It’s 2:34 PM and a station seems inactive since 1:07 PM. I need to find the median traffic occurring in this 87 minutes window. That means I’ll need to fetch ~87 minutes * 4 weeks = 348 items from my StationsContent table. That’s a lot! Worse, the longer a station stays inactive, the more items I need to fetch.

If you recall from the previous part, I can only use 25 RCU for this whole application. With a 60 second accumulation period for the burst and a 230KB item size, it means I can realistically fetch: 25 RCU * 60s * 4KB / 230 KB =26 items each minute. And that’s if I use all my RCUs for this single table.

How can I avoid doing this many read operations?

Compounding

Instead of recomputing the whole “window” each time, I could compute the missing activity on the past tick (minute) and compound it. In short, for each station that has no activity on a given tick, I can compute the missing activity just for this minute and add that to a “missing activity” state in another table. So, at a given moment, I’m only interested in knowing the median traffic for a single minute, not for a window of possibly several hours, and that means only fetching 4 items. The problem is I would need to keep a month of minute-by-minute history in the StationsContent table and that would eat a big chunk of my free 25GB.

Precomputing

But do I really need a minute-per-minute accuracy for the expected traffic? I could summarize the traffic of each station in 10-minutes fixed windows using a kind of Statistics table. That will give me enough accuracy and divide by 10 the amount of data I’ll need to store long-term (even more, as I could only store the activity on this timeslot, and not the other attribute that I have in StationsContent).

I can also precompute the median on this timeslot. This could prove useful later if I want to display the expected activity of a given station: I’ll simply need to look into the MedianActivity table.

This seems more complicated, but it’s the kind of tradeoff I need to make if I want to keep the cost down. I’m constrained by the free tier limitations (read/write operation and table size), but I can create how many tables I want, so I’m taking advantage of that to store the intermediary computation in dedicated tables instead of doing more expansive calls over larger data.

Note that the StationsState table contains the current state for each station. To follow more easily the change of state of stations during the day, I’ll create another table (StationStateChanges) which in will I’ll insert an item each time a station state changes.

Implementation using DynamoDb stream and EventBridge

The implementation will require three new tables (StationStatistics, MedianActivity and StationState) and three new function (ComputeStatistics, ComputeMedianActivity and ComputeState).

I want to run ComputeStatistics and ComputeStationsState each time a new item is added in the StationsContent table. So I will rely on DynamoDb Steams. This allows to trigger a lambda function each time an item is added/updated/removed from a Dynamo table.

To set it up using CloudFormation/SAM, I can no longer use the AWS::Serverless::SimpleTable type from SAM. I need to switch to the more complex AWS::DynamoDB::Table that exposes a StreamSpecification attribute. Then, I have to declare a DynamoDB event on my function.

The Lambda function will receive the event as a parameter when it is triggered, and I can access the object that has just been inserted in the StationsContent table. After that, I’ll just need to get the statistics corresponding to the timeslot, update them with the new event’s content, and store them back.

The median is updated independently, every 10 minutes. So I can use an EventBridge rule (like in part 2) for that.

Automatic retries

By default, if your Lambda function fails when processing a DynamoDB Stream event, this event will be retried again and again. That’s perfect if your problem is transient as the retry is likely to succeed after a few times.

But if your problem is permanent (let’s say you pushed a new buggy version of your function), then the retries will happen again and again. Worse, the failed event will pile up. In my case, with one event every minute, if I take one hour to fix a fresh bug, it will mean about 60 events will be waiting to be retried. And that can cause havoc on my finely-tuned DynamoDB tables, with their carefully chosen RCUs and WCUs.

Luckily, I have two parameters at my disposal on the AWS::Serverless::Function resource: MaximumRecordAgeInSeconds and MaximumRetryAttempts.

Monitoring

I’m starting to have a few functions that run all day long. At the moment, the only way to find if they are working is by looking at the logs and each Lambda metric in the web console. It would be better if I could be notified (by mail, for example) whenever a function fails.

To do that, I can set up a few CloudWatch alarms. For example, here is an alarm that will ring when the FetchStationsContent fails at least twice in the past 5 minutes (the Velib API can be a little flaky, so I’m allowing it to fail occasionally).

Of course, instead of setting it up in the console, I could also define the alarm in the CloudFormation/SAM template.

Note: you need an SNS topic to send your alarm result. I find it quicker to create it via the web console. So I’ll provide it as a parameter in my CloudFormation/SAM template.

Bad weather and holidays

After a few weeks, the application was behaving well. I could see my Statistics and Median table filling up, and I could look at the StationStateChanges to see the stations being detected as locked and their state changing as soon as the activity was restarting.

Then, it rained. A lot. And I saw almost a fifth of the stations being detected as locked in a single afternoon. I went to the station nearest to my flat, which had not seen any traffic for the past 3 hours and had been marked as locked. I was soaked but I was able to rent a bike without any problem.

So what happened? You already understood: nobody wants to rent a bike in the pouring rain. But my application is not able to differentiate between a “legitimate” slow down in activity or an individual station problem.

A week later, we had a public holiday in France. And something similar occurred in the morning: a bunch of stations saw very little traffic between 7 AM and 9 AM, while it was usually their busiest time slot.

Thinking about it, I was also worried about the impact of the bi-annual daylight saving time change.

How can I account for the weather, the public holidays, and the daylight saving time change?

  • I could use a weather API (for example https://openweathermap.org/current) and change my threshold when it’s raining or when there is a lot of wind or it’s very cold.
  • I could find some kind of API for the french public holidays (for example https://www.abstractapi.com/holidays-api) and use “Sunday’s” median activity for those days.
  • I could hardcode the daylight saving time changes for the next 10 years and maybe apply a coefficient the days after a change.

This seems tedious. Each new kind of problem asks for a specific data source and a specific modification to my algorithm. I don’t like it. I would prefer a single, global, solution.

What do all those problems have in common? The overall network is impacted, with the traffic being lower or higher than usual. So, why not use the overall traffic change as an input?

I can compute a “network activity ratio” that is the network’s global activity divided by the expected median traffic at a given time. And use this global ratio in my algorithm.

So my formula will go from that:

To that:

With this updated algorithm my detection is more robust. Detecting false positives or negatives is beyond my reach (I don’t have the time to test each of the 1.500 stations every hour). But I’m quite happy with my results.

The core of the application is now finished, but I’m missing a frontend to display the status of each station.

See you in part 4!

  • Part 1: Choosing the AWS serverless stack for a prototype
  • Part 2: The backbone of a serverless app: Lambda functions and DynamoDb tables
  • Part 3: Implementing a real-time detection algorithm with Lambda functions and DynamoDb streams
  • Part 4: Creating a serverless API and hosting a frontend with S3
  • Part 5: Performance tuning for a Lambda-based API

--

--