Depths of Big Data Platform at Getir: Streaming

Seleme Topuz
Getir
Published in
5 min readMay 13, 2019

The project reflects MongoDB changes on Redshift in the big data platform in near real-time.

Phases

1) Data Ingestion

  • MongoDB, after version 3.6, released a future called change streams enabling you to acquire all the changes that happens in a collection, database or cluster. You can imagine this as the modern version of tailing oplogs from MongoDB. The change events can be streamed through the watch function that is available on MongoDB client.
  • A producer actually consumes Mongo’s change streams and produce entries for AWS Kinesis. Now that change events are in Kinesis right now, one can write a consumer to read those changes. We have consumer triggering a AWS Lambda function to save those change events to AWS S3.
  • Our data ingestion pipeline can both vertically and horizontally scale to keep up with the changes.

2) Data Transformation

  • Periodically a streaming job runs and collects data in s3 in minibatches.
  • Data in S3 is in JSON format which should be converted to Redshift SQL format.
  • Some metadata like schemas and table names had been written by ETL job. These metadata is used by streaming job to shape the JSON data into SQL.

3) Load

  • Data in minibatches being very small, can be written to Redshift in a short time period which indicates the whole process is said to be close real time.

In the pipeline

  • After change events are being written to S3, the streaming job gets those change events, using a forced-schema model to transform those to Redshift-acceptable Spark SQL format. Then it updates Redshift with the data.
  • Even though Hadoop’s big data support is not required in this step, Spark is needed and Hadoop YARN’s Web UI gives us a nice job-overlooking system. We can see which jobs have failed and why.

Transformation of the data

  • There is an advantage in making streaming application in the form of minibatches. Say, the streaming job runs every 3 minutes. A document in MongoDB can be update many times in that period of time. Instead of updating the corresponding row in Redshift many times, I simply ignored the former ones and used only the latter change event which gave me the last updated version of the document.
  • My other approach is that I put the change events of ‘insert’, ‘replace’ and ‘update’ types in the same bucket whereas I put ‘delete’ events are in another bucket. That is because all ‘insert’, ‘replace’ and ‘update’ events gives me the whole document, and should be considered the same for data ingestion.
  • Another notable point that I discovered was the difference of BSON types. In ETL, mongo-spark connector converted MongoDB’s BSON typed objects into SparkSQL representatives by using a mapping in Scala. There are a lot of differences in streaming case because the change events are in JSON format, and the BSON types are converted to a special JSON alternative. For example ‘Date’ BSON typed objects are converted to a JSON like {“$date”: <timestamp>}. In order to overcome this problem, I had to write a recursive function to iterate through all data schemas and finds these BSON type conversion and updates them with what we want.

Streaming Benchmark

During streaming, we do bunch of different transformations than ETL. That naturally may cause data to be altered.

Streaming benchmark, written with Python 3 just like many other parts of Big Data Platform, prepares a set of ‘id’s that are ensured to be the documents which had been moved to Redshift in both ways of ETL and streaming so that we could not miss streaming documents as they may be potentially smaller in size compare to those moved with ETL.

We have set up performance criteria that we consider in benchmark results. Those are computed after each MongoDB and Redshift sourced data are transformed and being prepared for the comparison part.

The streaming benchmark should also be run in an environment with Spark. However, Hadoop is not required in this case due to the small size of data that are compared. And, we do not require any distributed computing along the way. Spark dependency is due to the MongoDB transformer which requires Spark SQL engine to extract DataFrames out of MongoDB data.

Recovery System

During development of the streaming, I discovered this annoying problem where the type of a field changes, and the type of streamed field does not follow the schema generated by ETL.

This problem can be overcome by running ETL again since it would infer the schema types over again, and resolve it with the correct field type.

For example, ETL can label the type of a field as ‘integer’. However, MongoDB is a NoSQL database, which allows double typed values in that field. When the double typed value is present in the streaming job, the job fails during transforming process.

The recovery system automatically detects if a streaming job fails, and runs ETL job, then starts rerunning streaming jobs. This may not be a perfect solution, and the data will not be updated in longer time.

Streaming is the modern way to moving data in near real-time, compared to mass ETL. However, there are still many problems emerging from the underlying understanding of streaming the data. One may state that streaming is not as secure and guaranteed as the ETL. In my opinion, in contrast, the risk can be reduced and the overhead is worth it as you can have your data in near real-time.

--

--