Enrichment Nirvana — using $lookup with Atlas Stream Processing

kennygorman
MongoDB
Published in
4 min readMay 24, 2024

How using $lookup makes enriching streams of data a no-brainer

Photo by Jurian Kersten on Unsplash

Enriching streams using data persisted in databases is a key capability for anyone building stream processors. It’s used across a massive corpus of use cases — enriching streams of manufacturing data with historical aggregations, fetching customer details to enrich streams of e-commerce actions, and so on. These use cases all work the same way: the stream isn’t the whole story, and enriching the data from another source completes it.

The notion of a join in MongoDB may not be familiar, as it's not a concept that has been traditionally used. However, the functionality to perform a logical join has been in MongoDB since very early in its life. The capability is provided with the $lookup command. With Atlas Stream Processing, we tackled the most common, and frankly most simplistic, use cases first — adding the ability to enrich streams using $lookup. At the top of our list of concerns was that joining data is a source of developer frustration. It’s complicated and messy to visualize how the join works and why the results look like they do. The MongoDB interface was already good, but could we adapt it to work with streaming systems?

A simple example

Let’s take a simple example to illustrate how it works. Let’s say we are building an application for solar farm maintenance workers. This application must show a stream of panel performance numbers (the event stream) and the panel's manufacture and model (persisted in a database). This would help the worker better understand underperforming panels and technical details for each panel type, warranty, and support paths.

For this example, we can use the built-in sample_stream_solar data source. For each message, we will perform a lookup on the solar_devices collection using the source key (localField) device_id and lookup key (foreignField) panel_id. The result is specified to return as the panel_detail key.

// create a source using the sample_stream_solar source
let s = { '$source': { connectionName: 'sample_stream_solar' } }

// now let's create a lookup from the database
let l = {
'$lookup': {
from: { connectionName: 'Cluster0', db: 'demo', coll: 'solar_devices' },
localField: 'device_id',
foreignField: 'panel_id',
as: 'panel_detail'
}
}

// create a processor with these two stages
let p = [s, l]

// look at the results
sp.process(p)

{
device_id: 'device_3',
group_id: 3,
timestamp: '2024-05-24T14:46:53.359+00:00',
max_watts: 250,
event_type: 0,
obs: {
watts: 170,
temp: 24
},
_ts: ISODate("2024-05-24T14:46:53.359Z"),
panel_detail: [
{
_id: ObjectId("6650a6ca277691637d5d34e6"),
panel_id: 'device_3',
panel_type: 'Panasonic',
panel_model: 'PNS-400-EVP132GL'
}
]
}
{
device_id: 'device_1',
group_id: 2,
timestamp: '2024-05-24T14:46:53.359+00:00',
max_watts: 250,
event_type: 0,
obs: {
watts: 121,
temp: 14
},
_ts: ISODate("2024-05-24T14:46:53.359Z"),
panel_detail: [
{
_id: ObjectId("6650a66a277691637d5d34e4"),
panel_id: 'device_1',
panel_type: 'Panasonic',
panel_model: 'PNS-EVPV400HK'
}
]
}

Now we have a data stream (max_watts, obs.watts, obs.temp) that we have enriched with data from the collection (panel_detail.panel_type, panel_detail.panel_model). Technically speaking, it’s a left outer join (so it still returns events where there is no match in the lookup collection), and we used an equality match with a single collection.

Pro Tip

You might notice in the example the enriched data is returned as a nested element. But there is a pro tip to graduate the nested document to the top level using $replaceRoot with $mergeObjects, then using $project to remove the nested key.


// the same lookup as above
let l = {
'$lookup': {
from: { connectionName: 'Cluster0', db: 'demo', coll: 'solar_devices' },
localField: 'device_id',
foreignField: 'panel_id',
as: 'panel_detail'
}
}

// move the data returned from $lookup to the root level
let r = {
$replaceRoot: { newRoot: { $mergeObjects: [ { $arrayElemAt: [ "$panel_detail", 0 ] }, "$$ROOT" ] } }
}

// remove the data we don't need anymore
let p = {
$project: { panel_detail: 0, _id: 0, panel_id: 0 }
}


// This would output a document that looks like this:

{
_ts: ISODate("2024-05-24T14:46:53.359Z"),
device_id: 'device_1',
group_id: 2,
timestamp: '2024-05-24T14:46:53.359+00:00',
max_watts: 250,
event_type: 0,
obs: {
watts: 121,
temp: 14
},
panel_id: 'device_1',
panel_type: 'Panasonic',
panel_model: 'PNS-EVPV400HK'
}

Going further

The $lookup syntax supports more sophisticated capabilities, like correlated subqueries using a nested pipeline, iterating over arrays, and more. We could introduce a $lookup that works at window boundary time (it runs when windows close vs. message by message). Should we? We want to hear from you.

Join the conversation on the MongoDB Community Forums. Just select ‘Atlas Stream Processing’ as the category.

Lastly, check out the Github repo for this example. It will guide you through running the complete example yourself.

--

--

kennygorman
MongoDB

Product Management @mongodb, Previous Co-Founder @eventadorlabs & @objectrocket. Early @paypal. Views are my own.