Continuous Data Processing with Star Schema Data Warehouse using Apache Spark

Sarath Chandran
Alef Education
Published in
5 min readJul 28, 2019

Understanding the Data

We at Alef Education strive to optimize the learning abilities of k-12 students with interactive course contents aligned with school curriculum. We collect learning data for education data mining and research to understand and adapt a personalized learning experience for each student. In this post, I am sharing our data processing strategies for continuous BI reporting.

Let’s take a short tour in understanding the kind of data and its sources.
The school information service collects data about our students, classes, subjects, class schedules etc. These data are mostly relational. The students learning activities are recorded as learning progress activity data on our platform in document-based data store. A graph structure keeps track of relationships between the content, curriculum and grade level and each micro-service communicates reliably with messaging queues.

All microservices publish data to RabbitMQ messaging queue. Each service can consume other service’s data through these queue. Likewise, we have an Akka Streams service which subscribes to all messages from RabbitMQ and publish to Kafka. Then we stream the data to Amazon S3 with Spark Structured Streaming.

Data collection: Schema on Read

All data are collected and stored in S3 without any major transformation through Spark Streaming. These data are immediately available for querying through Hive (or) Presto for ad-hoc analysis.

For our reporting to be dissected by varying filter combinations, we garnish the data with dimensional information. Schools, grades, classes, subjects and learning objective are different dimensions.

Data with Slowly Changing Dimension

Student’s move from one class to another; their grade, subject and sometimes even school, change over time. In order to provide a singular identity through the history of all transactions of one particular student, it is mandatory to capture these changes as slowly changing dimensions with respect to students.

Students belong to grade and class, hence, we defined tables like dim_student which has relations with dim_grade and dim_class. This makes a snowflake relation, however, we wanted to keep the relation as a star schema by referencing their IDs. Slowly changing dimensions can be handled by updating the grade_id and class_id with a new record and having a status column marking the previous record inactive. An active_untilcolumn helps retrieve the previous class’ history of a student.

Notice that in the sample table, Sam’s record has been repeated more than once and the prior row’s columns status and active_until are set with values for historical reporting.

Star Schema: Schema on Write

All data in S3 are read with Apache Spark and distributed a in Star Schemalike below. We collect students’ learning records as learning experiences, which are facts and can be grouped and filtered by all dimensions.

The Problem Statement: Time Spent Analysis

Lets say that the problem at hand is to find the average time a student spends on a lesson for a particular subject.

A lesson consists of several sub-parts, each of which attributes to different actions a student can do and learn in that lesson. A message needs to be generated at the start and end of every sub-part of the lesson. We named these events as LearningSession (the top level lesson) and ExperienceSession(the sub-parts of the lesson).

Event : LearningSession
Header : learningSessionStarted/learningSessionFinished{
"occurredOn" : <timestamp>,
"learningSessionId" : <uuid_of_learning_session>,
"schoolId" : <uuid_of_school>,
"gradeId" : <uuid_of_student's_grade>,
"classId" : <uuid_of_student's_class>,
"subjectId" : <uuid_of_subject>,
"objectiveId" : <uuid_of_objective_content>,
...
...
followed by columns measured like
"attempt" : <attempt>,
"stars" : <stars>,
"objective_type" : <objective_type>
...
}Event : ExperienceSession
Header : experienceSessionStarted/experienceSessionFinished{
"occurredOn" : <timestamp>,
"learningSessionId" : <uuid_of_learning_session>,
"experienceId" : <uuid_of_experience>,
"schoolId" : <uuid_of_school>,
"gradeId" : <uuid_of_student's_grade>,
"classId" : <uuid_of_student's_class>,
"subjectId" : <uuid_of_subject>,
...
...
followed by columns measured like
"score" : <score>,
"attempt" : <attempt>,
"lesson_type" : <lesson_type>
...
}

A single learning session’s time spent can be estimated by grouping several experiences’ time spent events. Note that there are edge cases like empty time window where no experience events are recorded or where the finished experience has not yet arrived.

Since data are collected across simultaneous learning transactions by all students, all event transactions have to be grouped by their respective learningSessionId to find the total time spent per lesson.

The total time spent can be calculated by finding the difference in time spent between each experiences in a grouped LearningSessionId.

To make the data processing continuous, we changed our S3 bucketing strategy for parking the data and by introducing a staging layer to convert UUID data into numeric id columns.

S3 Bucket Strategy

Since the requirement is to continuously process data (facts and dimension) and build a star schema to help BI reporting, we decided to speed up the Spark Batch transformation by bucketing data in incoming the first bucket to get data from Spark Streaming, then we take the data for batch transformation in another bucket processing which is mainly an intermediate staging for pushing data to our data warehouse. The final partition of data resides in the data bucket.

This helped us schedule a batch in the incoming bucket as quick as the data arrives; for sanity we wait for 1 hour worth of data to arrive and continuously process them.

Data Warehouse Staging Layer

All id columns from application are UUIDs, but in our Amazon Redshift data warehouse we used numeric id columns as seen in the above table. Hence, we maintained a staging layer where all application transformations are done and this staging layer will insert the records in the final table with auto-incrementing numeric id. The numeric id conversion is maintainable for incremental records and also helps faster query processing.

Conclusion

Slowly changing dimension and UUID made it more difficult to achieve continuous processing on data. However, we modified our approach with S3 bucketing and introduced temporary staging tables to avoid delaying our processing using Apache Spark.

--

--