Depths of Big Data Platform at Getir: Streaming

Seleme Topuz
May 13 · 5 min read

Phases

1) Data Ingestion

  • MongoDB, after version 3.6, released a future called change streams enabling you to acquire all the changes that happens in a collection, database or cluster. You can imagine this as the modern version of tailing oplogs from MongoDB. The change events can be streamed through the watch function that is available on MongoDB client.
  • A producer actually consumes Mongo’s change streams and produce entries for AWS Kinesis. Now that change events are in Kinesis right now, one can write a consumer to read those changes. We have consumer triggering a AWS Lambda function to save those change events to AWS S3.
  • Our data ingestion pipeline can both vertically and horizontally scale to keep up with the changes.

2) Data Transformation

  • Periodically a streaming job runs and collects data in s3 in minibatches.
  • Data in S3 is in JSON format which should be converted to Redshift SQL format.
  • Some metadata like schemas and table names had been written by ETL job. These metadata is used by streaming job to shape the JSON data into SQL.

3) Load

  • Data in minibatches being very small, can be written to Redshift in a short time period which indicates the whole process is said to be close real time.

In the pipeline

  • After change events are being written to S3, the streaming job gets those change events, using a forced-schema model to transform those to Redshift-acceptable Spark SQL format. Then it updates Redshift with the data.
  • Even though Hadoop’s big data support is not required in this step, Spark is needed and Hadoop YARN’s Web UI gives us a nice job-overlooking system. We can see which jobs have failed and why.

Transformation of the data

  • There is an advantage in making streaming application in the form of minibatches. Say, the streaming job runs every 3 minutes. A document in MongoDB can be update many times in that period of time. Instead of updating the corresponding row in Redshift many times, I simply ignored the former ones and used only the latter change event which gave me the last updated version of the document.
  • My other approach is that I put the change events of ‘insert’, ‘replace’ and ‘update’ types in the same bucket whereas I put ‘delete’ events are in another bucket. That is because all ‘insert’, ‘replace’ and ‘update’ events gives me the whole document, and should be considered the same for data ingestion.
  • Another notable point that I discovered was the difference of BSON types. In ETL, mongo-spark connector converted MongoDB’s BSON typed objects into SparkSQL representatives by using a mapping in Scala. There are a lot of differences in streaming case because the change events are in JSON format, and the BSON types are converted to a special JSON alternative. For example ‘Date’ BSON typed objects are converted to a JSON like {“$date”: <timestamp>}. In order to overcome this problem, I had to write a recursive function to iterate through all data schemas and finds these BSON type conversion and updates them with what we want.

Streaming Benchmark

During streaming, we do bunch of different transformations than ETL. That naturally may cause data to be altered.

Recovery System

During development of the streaming, I discovered this annoying problem where the type of a field changes, and the type of streamed field does not follow the schema generated by ETL.

Getir

We like to write about things we think are cool, related to our business of course.

Seleme Topuz

Written by

Software Engineer at evet

Getir

Getir

We like to write about things we think are cool, related to our business of course.