Building a simple data pipeline in Mage

Manikandan Bellan
5 min readSep 19, 2023

Being a data guy and having tried different modern data stack tools like dbt, databricks, snowflake all via online courses, I wanted to try a new tool on my own without having to go through trainings.

I stumbled upon Mage (https://www.mage.ai) which is garnering lot of attention in the data pipeline/orchestration space and is also being looked upon as a replacement for Apache airflow (I cannot comment on this as I have not used airflow much other than building a couple of DAGs couple of years back).

Their website has a 2 mins demo which is good enough for anyone to get started and I went ahead to try this to build a simple data pipeline

Read a public API → Load raw data to a Postgress table → Build aggregate tables for reporting

Getting Started

Installing and getting started with Mage is pretty simple. It requires just 3 steps

  • Open a virtual env
  • Run command — pip install mage-ai
  • Run command — mage start [project name]

This opens up a new browser on local host (http://localhost:6789/) with all the project folder setup as you see below

Project folder created automatically on local host

Mage comes with notebook type UI which helps you build your pipelines and provides pipeline blocks(below) which are individual files which are reusable. Same block can be reused by multiple pipelines within a given project.

Data Loader : Provides templates to connect and fetch data from different sources(remote/local) using language of your choice Python, SQL

Transformer : Helps transform (filtering, aggregating etc),clean data which is passed from the data loader block

Data exporter : Helps load data to different targets

Custom : You can write custom code (Python, SQL etc) to execute operations on the data

Sensor : Sensor is a block that evaluates a condition continuously until it is met or until a time

Scratchpad : Use this block to write throw away code. This block is not used when a pipeline is run

The pipeline

Our pipeline is built using below blocks (I am adding screen shots and code snippets for easy understanding) :

  • Data loader block to extract data from a public API,parse the json and create a dataframe. Load from API template comes with pre filled code where all we have to do is just provide the URL of the API and add additional code to parse the output before building the dataframe as seen below
Data loader to extract API
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
url = 'https://www.healthit.gov/data/open-api?source=workforce-programs-trained.csv'
response = requests.get(url)
jsonResponse = response.json()

#Creat a empty data list
data_list = []

# Create the full data list by looping through the API response
for key in jsonResponse:
try:
region = key['region']
region_code = key['region_code']
period = key['period']
geo_area = key['geo_area']
students_trained = key['students_trained']
data_list.append([region, region_code, period, geo_area, students_trained])
except KeyError:
print(f'Key Error Occured for Key : {key}')

# Create the dataFrame
columns = ['region', 'region_code', 'period', 'geo_area', 'students_trained']
trainings_df = pd.DataFrame(data_list, columns=columns)

return trainings_df


@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
  • Data exporter block to load raw data dataframe from the data loader to a postgress table. Database configurations are stored in the io_config.yaml under the project directory
Data Exporter
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
schema_name = 'public' # Specify the name of the schema to export data to
table_name = 'students_trained_raw' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
Raw table loader by the Data Exporter
  • Custom block to build aggregated tables using SQL
Custom for data aggregation
-- Docs: https://docs.mage.ai/guides/sql-blocks

-- create a aggregated view by region,region code and period
CREATE TABLE IF NOT EXISTS training_by_region (
region VARCHAR(20),
region_code VARCHAR(2),
students_trained decimal
);

CREATE TABLE IF NOT EXISTS training_by_geo_area (
geo_area VARCHAR(200),
students_trained decimal
);

TRUNCATE TABLE public.training_by_region;
TRUNCATE TABLE public.training_by_geo_area;

INSERT INTO public.training_by_region
SELECT region,region_code,sum(students_trained)
from public.students_trained_raw group by region,region_code;

INSERT INTO public.training_by_geo_area
SELECT geo_area,sum(students_trained)
from public.students_trained_raw group by geo_area;
Aggregate by Region
Aggregate by geo area

Orchestrating

As and when you create blocks and add to a pipeline, Mage creates a DAG of blocks this DAG can be scheduled using Triggers with multiple configurations within Mage

Trigger for scheduling

Conclusion

It was fun trying to build this standard batch pipeline. Hope this blog gives a fair idea of how to use the tool.

I have just scratched the surface. Mage offers multiple capabilities like building data integrations, streaming pipelines etc which needs to be explored.

--

--