MongoDB Atlas Stream Processing — Your First Steps

kennygorman
MongoDB
Published in
5 min readApr 15, 2024
Photo by Jeff Finley on Unsplash

Atlas Stream Processing is a remarkably powerful tool for processing streams of data within MongoDB Atlas. It leverages the MongoDB aggregation language and does not require additional tools like the Kafka Connector, Debezium, Apache Flink, or other data movement tools. It’s simple to get started and easy to grow.

Let’s dive in for a super quick primer for your first steps with Atlas Stream Processing.

What does it do?

Atlas Stream Processing enables processing streams of data to or from Apache Kafka and MongoDB collections. It’s great for capturing a continuous stream of change events in MongoDB collections or taking massive streams of Apache Kafka data and continuously aggregating them to MongoDB collections. In all cases, there is a source of data, some processing of the data, and then a sink for the data. Stream processors are developed and run via the MongoDB Shell or tools like VS Code using the MongoDB plugin.

The anatomy of a stream processor

The first thing to understand is that stream processors are written using the MongoDB aggregation language. Atlas Stream Processing has a few new stages that help with stream processing (see the docs links below). The next important thing to grok is that stream processors aren’t like regular queries that run and exit — they run continuously. Data is continuously being pushed through the aggregation pipeline from source to sink.

Stream Processors have the following properties:

  • They are processing pipelines defined using the MongoDB aggregation language.
  • Pipelines are an array of ordered stages.
  • A source is the first stage, and a sink is the last.
  • Any number of other stages process data in between.
  • The pipeline is evaluated by commands in Atlas Stream Processing using the MongoDB Shell, VS Code, or other tools/drivers coming in the future.

Here are a couple of important references for the MongoDB aggregation language:

  1. Practical MongoDB Aggregations reference
  2. The MongoDB aggregation language docs
  3. The Atlas Stream Processing docs

A stream processor is made up of a few simple parts and, generally speaking, has the following form:

// A processor is simply an array of stages and can be defined
// in pieces using variables in the shell.
// This is an empty processor definition, and it doesn't define anything yet.
let processor = []

// A source must be the first stage in any processor. It contains the
// source definition from the connection registry. Connections are
// defined as a document; in this case, it's empty. More on the
// connection registry later.
let source = {$source:{}}

// Add a source to the processor. The definition has one element in the array
// and it's a source of data.
processor = [source]
// or, this is valid too
processor[0] = source

// At this point, the processor definition is useful. We can use it to inspect
// data from the source and look at it in the shell using the command .process().
// The results are returned continuously to the shell.
sp.process(processor)

// A sink must be the last stage. It can be data sent to a MongoDB collection
// via $merge, or sent to Kafka topics via $emit. Both $merge and $emit stages
// are also defined in the connection registry.
let sink = {$merge: {}}

// This processor takes data from a source and sends it to a sink without
// any processing.
processor = [source, sink]
// or
processor[1] = sink

// A processor can be made up of any number of different stages in
// the pipeline. These stages are aggregation language stages similar
// to any other aggregation query, things like $match, $addFields, etc..
processor = [source, stageA, stageB, stageC, ..., sink]

// Lastly, processors can be saved with a name for future reference and
// started and stopped. Processors that are started go into the background
// and are processed continuously.
sp.createStreamProcessor('myProcessor', pipeline);
sp.myProcessor.start();

The connection registry

The MongoDB Atlas Stream Processing Connection Registry is a roster of definitions for your sources and sinks. It contains specifics about how and where to connect to other systems or clusters and the credentials for doing so. For example, a Kafka source on Confluent Cloud would specify the broker's address and the SASL credentials. Connection Registry entries are created using the MongoDB Atlas UI and referenced by logical name in the MongoDB Shell or VS Code plugin.

For example, take this connection:

The Connection Registry panel is used to add connections in MongoDB Atlas. Yay, Dark Mode!

Once created, this connection can be referenced in the shell like this:

// Find the connections in the connection registry I have access to
sp.listConnections()
[
{ name: 'developmentKafkaCluster', type: 'kafka' }
]

// Create a source using this connection using the pattern shown above
let source = {
$source: {
connectionName: 'developmentKafkaCluster',
topic: 'sensorReadings'
}
}
let processor = [source]

// inspect the data in the Kafka topic named sensorReadings.
sp.process(processor)

// We can continue to build our stream processor iteratively testing
// it as needed using sp.process()
processor = [........]

Some handy tricks

There are two last tricks that make learning how to create and debug processors in Atlas Stream Processing more enjoyable. Two special types of sources exist.

The sample source is a convenient built-in source and continuously emits a stream of sample solar generation data that can be useful in learning, demoing, or just debugging. This source is configured in the connection registry like any other source:

Adding the Sample Stream Solar source

And can be used in a stream processor like this:

// Create a source from the sample stream solar connection
let s = {$source: {connectionName: "sample_stream_solar"}}
let processor = [s]
// run it
sp.process(processor)

// the output looks like this
{
device_id: 'device_5',
group_id: 2,
timestamp: '2024-04-11T16:12:33.868+00:00',
max_watts: 250,
event_type: 0,
obs: {
watts: 146,
temp: 25
},
_ts: ISODate("2024-04-11T16:12:33.868Z"),
_stream_meta: {
sourceType: 'generated'
}
}

Lastly, a source can also be an array of documents defined without a Connection Registry entry. This is your debugging best friend. It works like this:

// Create a source using a documents array. This is a finite array of two
// messages
let s = {'$source': {
"documents" : [ {sensorName: 'test01'},{sensorName: 'test02'}]
}
}
let processor = [s]
sp.process(processor)

Wrapping it up

These are the fundamentals and first steps in getting started with Atlas Stream Processing. To get started, log into MongoDB Atlas, create a Stream Processing Instance, connect via the MongoDB Shell, and try these out yourself. Don’t forget to reference the docs above for help along the way, and if you get stuck, hit us up in the Community Forums!

--

--

kennygorman
MongoDB

Product Management @mongodb, Previous Co-Founder @eventadorlabs & @objectrocket. Early @paypal. Views are my own.