Dynamic Data Routing in Atlas Stream Processing

kennygorman
MongoDB
Published in
3 min readApr 23, 2024

--

Photo by Duy Thanh Nguyen on Unsplash

As streaming data practitioners, we often encounter challenges with data velocity and complexity. One particularly tricky area is being able to route streams of data to different destinations dynamically. Data routing is a common need across industries, whether data needs to stay within a particular geographic region, be clustered based on a query pattern, or perhaps routed as a cost-saving measure. This use case is prevalent in IoT, Banking, Manufacturing, and many other sectors. In essence, they all require a traffic cop responsible for sending data to destinations based on rules evaluated at run-time.

That responsibility is often times given to the stream processing system.

Atlas Stream Processing addresses this capability by routing the data using a dynamic expression. This is called Dynamic Data Routing . The data describes the desired destination, and the processor handles routing the data based on a simple configuration.

Take this bit of data as an example:

{ customerId: 911, region: 'USEAST', type: 'premier' },
{ customerId: 924, region: 'EUWEST3', type: 'starter' }

Customer 911 is in US-EAST, and customer 924 is in EU-WEST3. Based on those values, we would like to route these customers to different MongoDB collections.

For example, this creates a simple processor that routes based on two keys: one is the region, and the other is the type (If you aren’t familiar with Atlas Stream Processing, you can check out this post and then come back).

// Create a source stage. In this case, the data is a simple document array
let source =
{
'$source': {
documents: [
{ customerId: 911, region: 'USEAST', type: 'premier' },
{ customerId: 924, region: 'EUWEST3', type: 'starter' }
]
}
}
// Create a merge stage, and notice the variable substitutions for db and coll.
let sink =
{
'$merge': {
into: {
connectionName: 'MyAtlasDatabaseCluster01',
db: '$type',
coll: '$region'
}
}
}
// Assemble the processor, and run it.
let processor = [ source, sink ]
sp.process(processor);

This would write all the customers the cluster MyAtlasDatabaseCluster01, a database based on type, and a collection named based on region. The resulting data would be organized like this:

>use starter
switched to db starter
>db.EUWEST3.findOne()
{
_id: ObjectId("6626b34c83f4d38dc2b209ef"),
_stream_meta: { source: { type: 'generated' } },
_ts: ISODate("2024-04-22T18:58:20.549Z"),
customerId: 924,
region: 'EUWEST3',
type: 'starter'
}
>use premier
switched to db premier
>db.USEAST.findOne()
{
_id: ObjectId("6626b34c83f4d38dc2b209f0"),
_stream_meta: { source: { type: 'generated' } },
_ts: ISODate("2024-04-22T18:58:20.549Z"),
customerId: 911,
region: 'USEAST',
type: 'premier'
}

Dynamic Expressions can be used in the $merge stage (docs here) or in the $emit stage (docs here), so this technique works for MongoDB clusters as well as Apache Kafka. It’s important to point out that any expression can be used as long as it evaluates to a string, opening up a myriad of capabilities.

To try this yourself, log into MongoDB Atlas, create a Stream Processing Instance, connect via the MongoDB Shell, and use this post as an example. Don’t forget to reference the docs for help along the way, and if you get stuck, hit us up in the Community Forums.

--

--

kennygorman
MongoDB

Product Management @mongodb, Previous Co-Founder @eventadorlabs & @objectrocket. Early @paypal. Views are my own.