hands on: Realtime Streaming MongoDB into S3 using Spark QueueStream and Scala Observer

Eduardo Rost
ilegra
Published in
3 min readSep 1, 2019
Photo by Chinh Le Duc on Unsplash

Nowadays we have a tons of ways to do everything, but in my case was very difficult to find a solution for the problem that I was faced on, so I want to share what I did to solve mine with you guys. I wont explain in details each technology, just focus on the solution, but I will link with what you need to know with want more information about each one.

My problem was a little bit simple, just stream changes in a collection from mongoDB and send to S3. But it wasn't so simple as I thought, I will explain everything I went through.

Here I will use scala, but you can do this with others technologies, like python e.g.

My sbt code (build tool for scala) configuring all dependencies that I needed.

At this point I needed to get changes that occurs on mongoDB, so I was faced to the function watch() from mongoDB driver that "Opens a change stream cursor on the collection". So my code for this:

Note: watch() is async, so if i just run this code, nothing happens because it will register the subscribe and end the program. To understand better this behaviour, take a look on observables

The onNext() method receive all the information that I needed (token, document, operationType, removedFields, updatedFields, timestamp). Token is the id of the change, with this id, you can resume the watcher at that point using watcher.resumeAfter(token). The method process() I will explain later.

At this point I really needed something to hold the program running.

So Spark Streaming was my answer, but how? Calm down, I will explain how I linked spark streaming with subscriber.

Here my spark configuration, note the options for optimize writing to S3.

I used queue of RDDs as a Stream, the code below show how create a session stream based on a memory queue:

Here, streaming context was configure based on rddQueue, so it get data inside the queue iterate over elements and process, in my case sendS3(rdd) each 60 seconds.

The sendS3(rdd) was responsible for sending RDD to S3, I configured the coalesce(1) the dataframe to generate just one file in S3 (remember that spark works in a distributed way and will save n files without coalesce(1)) and the append mode to don’t override the files in the folder. I used spark.read.json(rdd) to make spark infer the schema from json string inside rdd.

At this point I have the mongoDB subscriber and the streaming context with S3 sender done, but separated. So how can we link them? Here my ace in the hole.

NOTE: I used lift-json to convert object to json string

I created a rdd from json string and enqueue in rddQueue, in my case I verify if have element in queue, if don't, I dequeue the rdd and union with the new one to have just one rdd in queue. Remember that the process method is called async, so each element changed in mongoDB will generate one RDD.

Now we have everything working together, spark streaming watching a queue and sending to S3 every 60 seconds, the mongoDB watching for changes and enqueue to spark streaming.

So, that's the solution, I hope it helps you guys.

Best regards

--

--