Real-time air quality monitoring and alerting – Part 3 - Bringing it all together!
- Published on
- Reading time
- Authors
- Name
- Simon Waight
- Mastodon
- @simonwaight
In this final part of this series of posts covering how to build your own air quality alerting system I will look at how we can take the data we are receiving from our sensor, along with the Azure Anomaly Detector model we've trained, and turn them into our alerting system by using Azure Functions and Twilio's SMS API.
Left to right on our flow diagram below we have:
- IoT Hub with its Event Hub compatible endpoint
- Azure Function with an Event Hub trigger, writing to a Cosmos DB SQL API Collection
- Cosmos DB stores the document and publishes a Change Feed notification
- Azure Function with a Cosmos DB trigger (it uses the Change Feed), reads all records from Collection and calls Anomaly Detector API
- If anomaly is found, Azure Function calls Twilio's SMS API and sends alert to registered mobile phone. Disable alerts for a period.
Anomaly Detector - batched events required
When I originally thought of my solution, I was hoping that I could simply trigger an Azure Function from new events on an Azure IoT Hub (using the Event Hub endpoint), but it turned out that the way Anomaly Detector works meant this wasn't going to work.
As I covered in my last post, when you define and train a new Anomaly Detector model instance you specify a numeric value for the slidingWindow
property (in my case I set it to the minimum of 28). When you later use the same model instance for inferencing you must supply at minimum the same number of events as the value of slidingWindow
(that is, 28 events).
Theoretically it's possible to achieve what I want by using the Azure Functions Event Hub Trigger in conjunction with the Offset property of the events from the Event Hub, but it turns out that the trigger for a single event can't be used in this way. Any solution to work around this limitation quickly becomes complicated and fraught with potential problems.
So how to fix?
The proposed data and alerting pipeline
While I could use Azure Stream Analytics to achieve a part of my final solution, there is no free tier for this service, so I am going to try and solve the problem using my preferred technology - Azure Functions.
Thankfully, I also have access to a nice juicy free tier for Azure Cosmos DB which gives me up to 25GB of storage (more than I will ever use!) and 1,000 Request Units (RUs).
I am going to use Cosmos DB as my event store and utilise the time-to-live (TTL) capabilities of Cosmos DB to limit the number of events I store to be only what I need for inferencing. As I am sending data from my device every two minutes, and I need a minimum of 28 events, I will need at least 56 minutes of data stored. Let's round that up to 60 minutes. This means we can set a TTL for the Collection to be 3,600 seconds, and when a record reaches 3,600 seconds it will expire. This really makes using Cosmos DB simple because I can now simply query all the records it contains and know that it will contain only the data I need for inferencing with Anomaly Detector.
An additional benefit of selecting Cosmos DB is that I can utilise the Azure Functions Cosmos DB trigger to trigger a new inferencing call when a new record is inserted. This trigger uses the Cosmos DB Change Feed which is an awesome way to do event-driven programming. The Azure Function driven by this capability will retrieve all the records from Cosmos DB, submit a request to our Anomaly Detector API, and based on the response, send a request to Twilio's SMS API to alert recipients that an unexpected event has occurred. I'd love to have use the Azure Communication Service SMS service, but at time of writing it wasn't available in Australia.
Deploy our Cosmos DB instance
You can use the Azure CLI or Cloud Shell to deploy the correctly configured Cosmos DB instance.
# Create Cosmos DB Account with Free tier in a single Region
az cosmosdb create --name 'myairdatacosmos' \
--resource-group 'myresourcegroup' \
--enable-free-tier true \
--locations 'myazureregion' \
--default-consistency-level Session
# Create Database
az cosmosdb sql database create --name 'myairdatadb' \
--account-name 'myairdatacosmos' \
--resource-group 'myresourcegroup'
# Create air quality data collection
az cosmosdb sql container create --name 'myairdata' \
--ttl 3600 \
--partition-key-path '/SensorName' \
--account-name 'myairdatacosmos' \
--database-name 'myairdatadb' \
--resource-group 'myresourcegroup'
# Create alert control flag collection
az cosmosdb sql container create --name 'alertcontroller' \
--ttl 1200 \
--partition-key-path '/id' \
--account-name 'myairdatacosmos' \
--database-name 'myairdatadb' \
--resource-group 'myresourcegroup'
# Create Azure Functions Change feed leases collection
az cosmosdb sql container create --name 'leases' \
--partition-key-path '/id' \
--account-name 'myairdatacosmos \
--database-name 'myairdatadb \
--resource-group 'myresourcegroup
It's important to note that the partition key I am using for the air quality data collection (myairdata
above) will result in a single 'hot' partition as all the data I receive will have the same value for the SensorName field. In large-scale systems this would be a really bad idea, but as I will have at most ~ 30 items in my collection I am not overly concerned about this.
I am also provisioning a second collection that will be used to control how often alerts are sent to recipients (once every 20 minutes, or 1,200 seconds - the TTL I set) - more on this later - and one for the Azure Functions Cosmos DB trigger.
Storing our event data in Cosmos DB
The first part of the solution is to use an Azure Function configured to receive new events from the Event Hub endpoint of the IoT Hub and insert them into Cosmos DB.
You can view the Azure Function ProcessDeviceEvent
on GitHub. There is nothing special to it - the Function receives one or more event in a batch and then reads the body out of the event and writes it to an in-memory structure that is then batch inserted into Azure Cosmos DB collection by way of a Cosmos DB Output Binding. All I need is some configuration information and with three lines of code I can achieve what I want. Given how infrequent this Function will fire (once every two minutes) and the small size of the payload, I don't have concerns around cost. Cold starts from a Consumption plan are also entirely OK.
A sample of a record written into Cosmos DB is shown below (the 'id' and '_*' fields are auto created on insert).
{
"SensorName": "PurpleAir-XXX",
"ReadingTime": "2022-08-15T05:26:41+00:00",
"Latitude": -00.000000,
"Longitude": 00.000000,
"PressureMillibars": 993.75,
"Humidity": 27,
"Temperature": 22,
"Pm25ChannelA": 0,
"Pm25ChannelB": 0,
"Pm10ChannelA": 0,
"Pm10ChannelB": 0,
"id": "efc1b6b0-XXXX-XXXX-XXXX-39f348a4c5cb",
"_rid": "NOTREAL==",
"_self": "dbs/NOTREAL/colls/NOTREAL/docs/NOTREAL/",
"_etag": "2a028e98-XXXX-XXXX-XXXX-62f9d9120000",
"_attachments": "attachments/",
"_ts": 1660541202
}
Draining the Event Hub
An important stop on our journey is to realise we should be careful not to immediately enable the last component in our pipeline the CheckForAdverseConditions
Function. Depending on how long you have been collecting events in your Event Hub for you may have a substantial number of new records written into Cosmos DB when you first deploy the ProcessDeviceEvent
Function. The great news is that by configuring a TTL of just 60 minutes we should find that after one hour we can then enable the final Function and not worry about the volume of data it will return on each call.
As all of our Function code lives in a single Function App deployment, the first time you deploy you should disable the CheckForAdverseConditions
Function by adding an additional Attribute in the code as shown below.
[Disable()]
[FunctionName("CheckForAdverseConditions")]
Once you have the records in place in your Cosmos DB database you can go ahead and remove the Disable
attribute and redeploy the Function which will cause it to kick in and read all records from Cosmos DB each time a new record is written.
Check for anomalous state and alert
Once we have our Cosmos DB air quality data being populated, we are ready to go ahead and enable the CheckForAdverseConditions
Function. You can view the source for it on GitHub.
This Function is triggered by new records in Cosmos DB and will do a few things:
- Read the contents of the alert control flag collection using a Cosmos DB Input Binding
- Read all records from the Cosmos DB air quality data Collection (it will only have ~ 30 records) using a Stored Procedure
- Convert the returned Cosmos DB data into the right request format to send to Azure Anomaly Detector
- Read the result from the Anomaly Detector
- If an anomaly is found, and alerts are not temporarily disabled, use Twilio to send an SMS to a configured mobile.
- If an alert was sent, disable them for a period by inserting a flag record into Cosmos DB. The TTL on the collection (1200 seconds in my script above) determines when the next alert will be sent.
Once all is said and done, here's what you would see on your mobile phone in the event that your implementation determines there was an anomalous value in the data. At this point you can open the Paku app (or the PurpleAir website) on your device and check local conditions, and determine your course of action accordingly.
Wrapping things up
Phew! We made it! It's been three quite involved posts, but in summary we did the following:
- Built a .NET Web API that we containerised and run as a field gateway for our IoT sensor. The field gateway sends events on to Azure IoT Hub (see post 1).
- Used Azure Stream Analytics to extract event data from the IoT Hub and transform it so we could use it to train an Azure Anomaly Detector model (see post 2).
- Used Azure Functions, Cosmos DB, our trained Anomaly Detector model and Twilio to tell us if a potential adverse air quality event has occurred (this post).
So, what is the total cost of operating this setup? Ideally as close to zero as possible! :) As we covered in the second post, Stream Analytics has no free tier, so that's by far the biggest expense. The longer you leave it running, the more it will cost. I ran mine for a few weeks and the cost was ~ $30 (AUD). I only needed to do this once (or periodically) so it's not an ongoing expense. Beyond this, given the low number of events and their small size, I am unlikely to generate excessive costs. If I was worried, I can always put a Cost Alert in place to give me peace of mind.
Here's a breakdown of services and the cost structure / tier I've used:
- Azure IoT Hub - Free tier - 8,000 messages per day up to 0.5 KB per message.
- Azure Cognitive Services Anomaly Detector API - Free tier (also multivariate detection, which is in preview, is free for use at time of writing).
- Azure Cosmos DB - Free tier - 1,000 RUs and 25 GB storage per month.
- Azure Function App - Free grant - 400,00 GB/s and 1 million executions per month. Currently $0 cost.
- Azure Storage - standard pricing for LRS - about 4 cents (AUD) per GiB. This is forecast to cost me 83 cents (USD) this month which might be high due to my deployments and data prep with Stream Analytics.
- Azure Stream Analytics - standard tier with no autoscale - only used to prepare data to train Anomaly Detector model. In the month I used the service - it ran for ~ 4 weeks it cost me just under $19 (USD)
- Application Insights - this is associated with the Azure Function App and is currently reporting no costs.
- Twilio SMS API - $6 per month for an Australian mobile number with a cost per message of just under 5 cents.
I'm hoping you've found this an interesting journey and one that shows it is possible to build practical solutions on Azure that won't break the bank.
Happy Days! 😎
PS - if you're looking for the source code for each post here's the quick links for each:
- Containerised IoT Gateway Web API (post 1): https://github.com/sjwaight/AirQualityAzureIoTGateway
- Stream Analytics Job for data prep (post 2): https://github.com/sjwaight/AirQualityStreamAnalytics
- Alerting pipeline Azure Functions (this post): https://github.com/sjwaight/AirQualityServerlessAlerts