<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by jmregs on Medium]]></title>
        <description><![CDATA[Stories by jmregs on Medium]]></description>
        <link>https://medium.com/@josemarireguyal?source=rss-282fff91c231------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/1*gXf9z6NNCGG4Wxsb2YSngg.jpeg</url>
            <title>Stories by jmregs on Medium</title>
            <link>https://medium.com/@josemarireguyal?source=rss-282fff91c231------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Sun, 24 May 2026 03:44:14 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@josemarireguyal/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[Using Snowflake and Data Build Tool (DBT)]]></title>
            <link>https://medium.com/@josemarireguyal/using-snowflake-and-data-build-tool-dbt-84269ba4ecb1?source=rss-282fff91c231------2</link>
            <guid isPermaLink="false">https://medium.com/p/84269ba4ecb1</guid>
            <category><![CDATA[dbt]]></category>
            <category><![CDATA[snowflake]]></category>
            <category><![CDATA[etl]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[data-analysis]]></category>
            <dc:creator><![CDATA[jmregs]]></dc:creator>
            <pubDate>Tue, 27 Aug 2024 01:14:06 GMT</pubDate>
            <atom:updated>2024-08-27T01:14:06.303Z</atom:updated>
            <content:encoded><![CDATA[<p>On my career journey as a data analyst, I have heard and used a lot of popular databases and data warehouses, but one popular tool I have not used yet is Snowflake. For those who are not familiar with it, Snowflake is a cloud-native data warehouse designed for analytics and data warehousing tasks (OLAP). For this article, I tested out using Snowflake by loading a sample online retail data that I stored in a Google Drive folder to Snowflake using FiveTran, and doing a simple transformation to the data using Data Build Tool (DBT). The diagram for this can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*COEatoHDL3n2f9nq2n7HJg.png" /><figcaption>Data Pipeline Diagram</figcaption></figure><h3>What is Snowflake?</h3><p>Snowflake is a cloud-native fully managed data warehouse platform designed for large-scale data and complex queries. It differs from most data warehouses in that it is designed entirely for the cloud. It does not have an on-premise version. Another unique thing about Snowflake is that it separates the compute and storage resources, which allows users to independently scale each resource, or Snowflake can automatically scale each resource, depending on the load. This allows for big savings when it comes to costs. Lastly, since it is fully managed online, all infrastructure management is handled by Snowflake allowing users to fully focus their time on data analysis.</p><h3>Testing Out Snowflake</h3><p>The first thing I did was to get sample data to load to Snowflake. For that, I got some sample online retail data from the UK. I got it from Kaggle with this link: <a href="https://www.kaggle.com/datasets/ulrikthygepedersen/online-retail-dataset">https://www.kaggle.com/datasets/ulrikthygepedersen/online-retail-dataset</a>. I stored the sample online retail data on a folder on my Google Drive. This can be done with any cloud storage service like Amazon S3 or Microsoft Azure Blob Storage. I used Google Drive just to be able to do it quickly.</p><h4>Loading Data to Snowflake using FiveTran</h4><p>Once we have the data in our chosen cloud storage, we first need to setup the Google Drive — FiveTran connection, and the FiveTran — Snowflake connection to be able to load the data to Snowflake. For the Google Drive — FiveTran connection, it will just need the link and access to the Google Drive folder. Once the setup has been done, the setup would look like the screenshot below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*qARkpIgBfiX11ubYvSGNHg.jpeg" /><figcaption>FiveTran Setup</figcaption></figure><p>The contents of the Google Drive folder can also be seen on the FiveTran user interface as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Sor_yoVFVlewMMu8yeXh9g.png" /><figcaption>Google Drive Folder</figcaption></figure><p>As for the FiveTran — Snowflake connection, we will need to setup the user, role, database, and data warehouse first in Snowflake. We will use these in setting up the connection. The good thing about Snowflake is that the setup can be done directly through querying.</p><p>The script for this can be seen below.</p><pre>begin;<br><br>   -- create variables for user / password / role / warehouse / database (needs to be uppercase for objects)<br>   set role_name = &#39;FIVETRAN_ROLE&#39;;<br>   set user_name = &#39;FIVETRAN_USER&#39;;<br>   set user_password = &#39;password12345&#39;;<br>   set warehouse_name = &#39;FIVETRAN_WAREHOUSE&#39;;<br>   set database_name = &#39;SAMPLE_DATA&#39;;<br><br>   -- change role to securityadmin for user / role steps<br>   use role securityadmin;<br><br>   -- create role for fivetran<br>   create role if not exists identifier($role_name);<br>   grant role identifier($role_name) to role SYSADMIN;<br><br>   -- create a user for fivetran<br>   create user if not exists identifier($user_name)<br>   password = $user_password<br>   default_role = $role_name<br>   default_warehouse = $warehouse_name;<br><br>   grant role identifier($role_name) to user identifier($user_name);<br><br>   -- set binary_input_format to BASE64<br>   ALTER USER identifier($user_name) SET BINARY_INPUT_FORMAT = &#39;BASE64&#39;;<br><br>   -- change role to sysadmin for warehouse / database steps<br>   use role sysadmin;<br><br>   -- create a warehouse for fivetran<br>   create warehouse if not exists identifier($warehouse_name)<br>   warehouse_size = xsmall<br>   warehouse_type = standard<br>   auto_suspend = 60<br>   auto_resume = true<br>   initially_suspended = true;<br><br>   -- create database for fivetran<br>   create database if not exists identifier($database_name);<br><br>   -- grant fivetran role access to warehouse<br>   grant USAGE<br>   on warehouse identifier($warehouse_name)<br>   to role identifier($role_name);<br><br>   -- grant fivetran access to database<br>   grant CREATE SCHEMA, MONITOR, USAGE<br>   on database identifier($database_name)<br>   to role identifier($role_name);<br>   <br> commit;</pre><p>The script will create a user that will be assigned a role, which will have access to the database and data warehouse to be used to store the data. The data warehouse in Snowflake’s case is the compute resource we use to be able to query and do compute work. The user name, role name, data warehouse name, and database name can be changed to any name. This can be run directly in Snowflake, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*U2VwLK5UW6uCdnXOsH0pLg.png" /><figcaption>SQL Script in Snowflake</figcaption></figure><p>Once everything has been setup in the Snowflake side, we will just need to put the created credentials for the FiveTran to Snowflake setup. Once that is done, the finished setup would look like the screenshot below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*odSQ4kcAfVaM2l2soh0arg.png" /><figcaption>FiveTran to Snowflake Setup</figcaption></figure><p>Once we have everything setup, we can now load the contents of the Google Drive folder content to Snowflake using FiveTran. We can do a manual run by clicking the button below. FiveTran can also be scheduled to have the data load done every day, week, or month, if you want.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*OETHtk4nf6b39XxLLwXkUA.png" /><figcaption>FiveTran Manual Run</figcaption></figure><h4>Using DBT to Perform Simple Transformation</h4><p>After loading the data from Google Drive to Snowflake using Fivetran, we should now be able to see the data in Snowflake. To check, we can run a simple script to check the table, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Ct6HUzrQJQl6aLV_AMVuwg.png" /><figcaption>Checking Table on Snowflake</figcaption></figure><p>We can see on the screenshot above that the data was loaded to Snowflake.</p><p>Once we have the raw data loaded, we can use DBT to perform simple transformation on the data. For creating the connection between Snowflake and DBT, we first need to setup everything in Snowflake again.</p><p>The script for the setup can be seen below.</p><pre>CREATE DATABASE Analytics;<br><br>CREATE WAREHOUSE transforming with warehouse_size = &#39;MEDIUM&#39;;<br><br>CREATE ROLE transformer;<br><br>GRANT USAGE ON DATABASE SAMPLE_DATA to role transformer;<br>GRANT USAGE on SCHEMA SAMPLE_DATA.GOOGLE_DRIVE to role transformer;<br>GRANT select on all tables in schema SAMPLE_DATA.GOOGLE_DRIVE to role transformer;<br><br>```<br>grant usage on database analytics to role transformer;<br>--grant reference_usage on database analytics to role transformer;<br>grant modify on database analytics to role transformer;<br>grant monitor on database analytics to role transformer;<br>grant create schema on database analytics to role transformer;<br>```<br><br>```<br>grant operate on warehouse transforming to role transformer;<br>grant usage on warehouse transforming to role transformer;<br>```<br><br>-- Create user for development environment<br>create user dbt_dev<br>email = &#39;dbt_dev@gmail.com&#39;<br>password = &#39;Sample123&#39;<br>default_role = transformer<br>default_warehouse = transforming<br>must_change_password = true;<br><br>grant role transformer to user dbt_dev;<br><br>-- Create user for deployment environment<br>create user dbt_prod<br>email = &#39;dbt_prod@gmail&#39;<br>password = &#39;Sample123&#39;<br>default_role = transformer<br>default_warehouse = transforming<br>must_change_password = true;<br><br>grant role transformer to user dbt_prod;</pre><p>The script above will create a database, warehouse, and role that we will use when connecting DBT to Snowflake. The transformer role allows any users within the role access to the “SAMPLE_DATA” database, which contains the raw data, and the “Analytics” database, which is where the transformed data from DBT will be placed. The script also creates two users called “dbt_dev” and “dbt_prod” that will be used in our development and deployment environment, respectively. These two users have been given the transformer role.</p><p>Once we have everything setup in Snowflake, we just need to connect DBT to Snowflake. The first step to this is to create a connection with Snowflake, which we will use in our environments. The setup for this can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*VjAYmSrYM7gLMcLxmNX6yA.png" /><figcaption>Snowflake Connection Setup in DBT</figcaption></figure><p>Once our connection is ready, we now need to create two environments for the DBT project, as seen below. The “Development” environment is used when developing or testing out the DBT models, while the “prod_environment” is used when we have jobs running the created DBT models.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*zSu7RWoJeMwMk4Ghe1zSSA.png" /><figcaption>DBT Environments</figcaption></figure><p>For the “Development” environment, the setup for this can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/552/1*eozcFyTzzqFhCKCH9-JdPw.png" /><figcaption>Development Environment Setup</figcaption></figure><p>It is using the Snowflake connection earlier, but for the development credentials, it is using the <strong>dbt_dev</strong> user we created earlier. The schema <strong>dbt_dev</strong> is where the tables will be stored on if we test out the models during development.</p><p>As for the “Production” environment, the setup for this can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/892/1*CanjzE8Cs40f-fHtWx3wmg.png" /><figcaption>Production Environment Setup</figcaption></figure><p>The only difference with the “Development” environment is that we are using a different user and schema. Everything else is the same as the “Development” environment.</p><p>Once we have everything setup, we can now create the DBT models to transform our data in the DBT cloud IDE. Since I just wanted to test out how working with Snowflake is, I created just a simple DBT model script, as seen below.</p><pre>/*<br>    CREATED BY: dbt_dev<br>    CREATED ON: August 15, 2024<br>    DESCRIPTION: Gets the top 500 rows based on the invoice date<br>*/<br><br>{{ <br>    config(<br>        materialized=&#39;table&#39;,<br>        alias=&#39;top_500_online_retail&#39;<br>        )<br>}}<br><br><br>SELECT *,<br>CURRENT_TIMESTAMP() AS inserttimestamp<br>FROM SAMPLE_DATA.google_drive.online_retail<br>ORDER BY INVOICE_DATE DESC<br>LIMIT 500</pre><p>The model just basically gets the top 500 rows based on the invoice date of the online retail data. It also adds an “inserttimestamp” column that tells when the data was loaded. The “config” part at the start tells DBT that it should create a table for the model, and that the name of the table should be “top_500_online_retail”.</p><p>The script in the DBT cloud IDE would look like the screenshot below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*u_k53ADR7Fv225lak0iFig.png" /><figcaption>DBT Model Script</figcaption></figure><p>To be able to run the model, just run the script below on the DBT Cloud console. The name of the DBT model is called “prod_dbt_model.sql”.</p><pre>dbt build --select prod_dbt_model.sql</pre><p>After running the model, it should now load the transformed data to the new table. This can be seen in the Snowflake screenshot below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*n4avAPcIxECh9DkSrZuwAQ.png" /><figcaption>DBT Model Table</figcaption></figure><h4>Setting up a job in DBT to load data to Snowflake</h4><p>Now that our model is good to go, we can now setup the job to load the transformed data to Snowflake and schedule it to run regularly. For this, I created a job and used the “prod_environment” we setup earlier. The setup for the job can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*W1hNKdL4m5eyNmYz9uDsHg.png" /><figcaption>DBT Job Setup</figcaption></figure><p>The command for the job can be seen below. Everytime the job is running, the job will run this model. The job is scheduled to run every 12 hours, but it can be scheduled to run at anytime.</p><pre>dbt build --select prod_dbt_model.sql</pre><p>Once the setup for the job is done, the job can be ran manually to test it out, as seen below. Just click the “Run now” button, and it should run the job.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*I5XFH9JgjaMyUdi2km92Tg.png" /><figcaption>DBT Job Manual Run</figcaption></figure><p>After running the job, DBT should load the transformed data to the schema we specified in the “prod_environment”, which is <strong>DBT_PROD. </strong>We can check if the data was loaded by querying the table in Snowflake. After querying the table, we can see that the table exists, and it has the top 500 rows based on invoice date, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Zw7MQ3hIDbgdxLBVjk63Rw.png" /><figcaption>DBT Prod Table</figcaption></figure><h3>Conclusion</h3><p>Working with Snowflake has been a breeze. It was able to connect to two other popular data tools, FiveTran and DBT, with no issues. As for the Snowflake platform itself, their UI is very easy to navigate, and everything is straightforward. Users can easily setup a database, data warehouse, or query whatever data they want to do. Given the short time I had to work with Snowflake, I loved the experience of using it for a simple project. Given that Snowflake is fully managed, I can see why it became so popular in such a short time. It really allows users to focus on the data, and not with the other work associated with using a database or data warehouse such as infrastructure management, maintenance, and versioning.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=84269ba4ecb1" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Using Airflow with Docker]]></title>
            <link>https://medium.com/@josemarireguyal/using-airflow-with-docker-c7807a6528e1?source=rss-282fff91c231------2</link>
            <guid isPermaLink="false">https://medium.com/p/c7807a6528e1</guid>
            <category><![CDATA[etl]]></category>
            <category><![CDATA[data]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[airflow]]></category>
            <category><![CDATA[docker]]></category>
            <dc:creator><![CDATA[jmregs]]></dc:creator>
            <pubDate>Wed, 19 Jun 2024 05:13:49 GMT</pubDate>
            <atom:updated>2024-06-19T05:13:49.645Z</atom:updated>
            <content:encoded><![CDATA[<p>I was looking at a data pipeline I created from my previous article (<a href="https://medium.com/@josemarireguyal/creating-an-end-to-end-data-pipeline-with-dbt-data-build-tool-and-coinmarketcap-data-9d616b827597">https://medium.com/@josemarireguyal/creating-an-end-to-end-data-pipeline-with-dbt-data-build-tool-and-coinmarketcap-data-9d616b827597</a>), and I wanted to make an improvement to it, by adding an orchestrator to the mix.</p><p>You might be thinking, what is an orchestrator? An orchestrator is basically a task scheduler for your scripts. It allows the user to manage the dependencies between tasks to ensure they go in the right order or sequence, and automates the execution of these tasks based on predefined schedules (e.g. daily, hourly, weekly, etc.). In addition to that, orchestrators allow the users to see the log file for each task to study any errors that might occur during a run.</p><p>There are a lot of orchestrators to choose from nowadays, but I decided to go with Airflow.</p><h3><strong>What is Airflow?</strong></h3><p>In choosing an orchestrator, I chose Airflow, because it is one of the most popular orchestrators out there. It allows users to create workflows using Python code, which allows for highly customized use cases. It is an open-source project that has a strong community base, which makes it easy to find documentations regarding the tool. Because of these two big things, Airflow is being used by a lot of people from big companies like AirBNB, Square, and Robinhood to people just using it for personal projects.</p><h4>How does it actually work though?</h4><p>There are 3 main concepts when it comes to Airflow. These are:</p><ol><li>Directed Acyclic Graphs (DAGs) = A DAG is a collection of tasks with defined dependencies and relationships. It ensures that tasks are executed in a specific order without any cycles. Each DAG is defined in a Python script.</li><li>Tasks = These represent a unit of work in a DAG. They are instances of ‘Operators’</li><li>Operators = These are the building blocks of a task. They define the action to be performed in a task.</li></ol><p>I will go into more depth later regarding these concepts when I show the actual code for our example.</p><p>Before we start using Airflow, there is actually one problem if you are using Windows. Airflow is Unix-based, which means a lot of the processes used in Airflow is not compatible with Windows-based systems. Since I am using Windows, we need a solution for this. To alleviate this problem, we need to use Docker.</p><h3>What is Docker?</h3><p>Docker allows users to package software into standard units called “containers” that has everything the software needs to run. This includes libraries, system tools, code, and runtime.</p><p>Docker allows the user to run each container isolated from each other. This sounds a lot like virtual machines (VMs), but Docker is actually more lightweight to use compared to VMs.</p><p>The reason for this is that containers in Docker run on the Docker engine, which uses the host OS. This means that even though all the containers are isolated from each other, they are all sharing the same OS as the host OS. Compare this to how VMs work, wherein each VM has their own unique OS, and the hypervisor manages the hardware resources for each VM. This allows for huge customizability, and flexibility for VMs, but in some cases, users do not need this sort of customizability.</p><p>This is why Docker is so popular, because it is so lightweight compared to regular VMs.</p><h4>Why does Airflow run on Docker on Windows?</h4><p>You may be wondering why Airflow will run on Docker, if Docker containers use the host OS for the containers. That is a valid point, but the Docker engine can actually take advantage of the Windows Subsystem for Linux (WSL2) to create a Linux environment for containers that need a Linux-based environment to run.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1022/1*tUM7HbaUvvry94DCu1NysQ.png" /><figcaption>Airflow Process</figcaption></figure><p>So even though the host OS is Windows, the Docker engine can use WSL2 to create a Linux environment for our containers, which will need a Linux-based environment.</p><p>Now, after explaining Airflow and Docker, we can now start showing an example for these two tools working together.</p><h3><strong>Code</strong></h3><p>There are multiple parts to getting Airflow running in Docker. I will divide these into sections.</p><h4>Docker Compose File</h4><p>To start things off, we first need to create a Docker Compose file, which is a YAML configuration file used to define and manage Docker applications that use multiple containers. The code for this is usually stored in a file named <strong>“docker-compose.yml”</strong>. This is needed, because we will need multiple services to get Airflow running in Docker.</p><p>For this demonstration, this is what the Docker Compose file looks like.</p><pre>version: &#39;3.8&#39;<br>services:<br>  postgres:<br>    image: postgres:13<br>    environment:<br>      POSTGRES_USER: airflow<br>      POSTGRES_PASSWORD: airflow<br>      POSTGRES_DB: airflow<br>    volumes:<br>      - postgres_data:/var/lib/postgresql/data<br><br>  init-db:<br>    image: apache/airflow:2.4.1<br>    depends_on:<br>      - postgres<br>    environment:<br>      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow<br>    command: airflow db init<br>    volumes:<br>      - ./dags:/opt/airflow/dags<br>      - ./logs:/opt/airflow/logs<br>      - ./plugins:/opt/airflow/plugins<br><br>  webserver:<br>    image: apache/airflow:2.4.1<br>    depends_on:<br>      - postgres<br>      - init-db<br>    environment:<br>      AIRFLOW__CORE__EXECUTOR: LocalExecutor<br>      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow<br>      AIRFLOW__WEBSERVER__SECRET_KEY: mysecretkey<br>      AIRFLOW__WEBSERVER__AUTHENTICATE: False  # Disable authentication<br>      AIRFLOW__WEBSERVER__RBAC: False  # Disable RBAC<br>      FLASK_APP: &quot;airflow.www.app&quot;<br>    volumes:<br>      - ./dags:/opt/airflow/dags<br>      - ./logs:/opt/airflow/logs<br>      - ./plugins:/opt/airflow/plugins<br>    ports:<br>      - &quot;8080:8080&quot;<br>    command: &gt;<br>      bash -c &quot;export FLASK_APP=airflow.www.app &amp;&amp; <br>           airflow webserver &amp; <br>           sleep 10 &amp;&amp; <br>           airflow users create -r Admin -u admin -e admin@example.com -f Admin -l User -p mypassword &amp;&amp;<br>           tail -f /dev/null&quot;<br><br>  scheduler:<br>    image: apache/airflow:2.4.1<br>    depends_on:<br>      - postgres<br>      - init-db<br>    environment:<br>      AIRFLOW__CORE__EXECUTOR: LocalExecutor<br>      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow<br>    volumes:<br>      - ./dags:/opt/airflow/dags<br>      - ./logs:/opt/airflow/logs<br>      - ./plugins:/opt/airflow/plugins<br>    command: scheduler<br><br>volumes:<br>  postgres_data:</pre><p>I will first explain the general parts of the code, and further explain later the use of each service:</p><ol><li>version = this refers to the version of the Docker Compose file format. It is usually advisable to use the latest version, since it has the latest features</li><li>services = these define the blueprint for the containers that will run on Docker. It defines how the containers will behave in production, how many containers will run, and how these containers will interact with one another. In our case, there are 4 services: <strong>postgres, init-db, webserver, and scheduler</strong>. I will explain the use of each service after explaining the general parts of the code.</li><li>image = this refers to pre-built Docker images used to create the containers for the services. It is a snapshot of an application and its dependencies, which means that it can easily be replicated and deployed across different environments</li><li>depends_on = used to check and make sure that the required services are running before starting the service. This is used to make sure dependencies are followed in multi-container Docker applications.</li><li>environment = setting the environment variables for each of the service.</li><li>volume = these are used to allow persistent data even if the containers are stopped or removed. There are two types of volumes in a Docker Compose file: <strong>Named Volumes</strong> and <strong>Bind Mounts.</strong> For the first one, Named Volumes are created by the user, and managed by the Docker application. These are used to persist data and to be able to share it in multiple services. The second one, Bind Mounts, map a directory from the host machine to a directory in the container. This is useful since changes in the host directory are reflected in the container directory, and vice versa.</li><li>command = allows users to specify what command the container should run when it starts up.</li></ol><p>After explaining the general parts, let us now go into the use and specifics of each service:</p><ol><li><strong>postgres = </strong>This service sets up the PostgreSQL database that will be used to store Airflow’s metadata</li><li><strong>init_db</strong> = This service sets up the required tables and schema needed for Airflow to operate. This is stored in the PostgreSQL database that was created in the first service. The environmental variable of this service is referring to the connection string used to connect to the database.</li><li><strong>webserver</strong> = This service provides the user the web-based user interface used to manage and monitor Airflow. This is accessed through ports 8080:8080, as seen in the code.</li><li><strong>scheduler</strong> = This service schedules and triggers the tasks based on the DAG definitions. It operates in the background, whenever a DAG is asked to run.</li></ol><h4><strong>DAG File</strong></h4><p>Before running Docker, we first need to create the DAG Python script file. This Python script contains one or more DAGs using the Airflow framework, which the scheduler will use to orchestrate the tasks. The metadata of the DAG Python script will be stored in the PostgreSQL database container that was created earlier.</p><p>I will also explain each part of the code below.</p><pre># Libraries<br>from airflow import DAG<br>from airflow.operators.python_operator import PythonOperator<br>from datetime import datetime<br>from scripts.fetch_data import fetch_data<br>from scripts.load_data import load_data_postgres_docker<br><br>default_args = {<br>    &#39;owner&#39;: &#39;airflow&#39;,<br>    &#39;depends_on_past&#39;: False,<br>    &#39;start_date&#39;: datetime(2024, 5, 1),<br>}<br><br>with DAG(&#39;fetch_and_load_data_dag&#39;, default_args=default_args, schedule_interval=&#39;@daily&#39;) as dag:<br>    <br>    fetch_data_task = PythonOperator(<br>        task_id=&#39;fetch_data&#39;,<br>        python_callable=fetch_data<br>    )<br><br>    load_data_task = PythonOperator(<br>        task_id=&#39;load_data_postgres_docker&#39;,<br>        python_callable=load_data_postgres_docker,<br>    )<br><br>    fetch_data_task &gt;&gt; load_data_task</pre><ol><li>Libraries = These are the needed libraries to run Airflow. Take note that <strong>fetch_data</strong> and <strong>load_data_postgres_docker</strong> are Python scripts that I created that will be orchestrated in Airflow.</li><li>default_args = These are the arguments that will be used for the DAG.</li><li>fetch_data_task and load_data_task = These are the two tasks within the DAG.</li><li>PythonOperator = This operator allows you to run a Python function in DAG.</li><li>task_id = The task name that will show up in the Airflow webserver for each task.</li><li>python_callable = These are the functions from the two Python scripts I created that will get data from an API, and load it into the PostgreSQL database that was created.</li><li>fetch_data_task &gt;&gt; load_data_task = sets up the dependency between the two tasks. It indicates that fetch_data_task should go first before load_data_task.</li></ol><p>To end this part, the DAG Python script file is usually stored in a “dags” folder in the Airflow project directory.</p><h4><strong>Python Scripts</strong></h4><p>These are the Python scripts used. These are just edited versions of the scripts I used in the DBT pipeline I created in my previous article.</p><p>The code for fetching data from CoinMarketCap can be seen below.</p><pre># Libraries<br>from requests import Request, Session<br>from requests.exceptions import ConnectionError, Timeout, TooManyRedirects<br>import json<br>import os<br><br>import pandas as pd<br><br>def fetch_data():<br>    # Get data from CoinMarketAPI<br>    # Storing JSON data from CoinMarketCap to variable. Gets top 100 cryptocurrencies based on market cap.<br>    url = &#39;https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest&#39;<br>    parameters = {<br>    &#39;start&#39;:&#39;1&#39;,<br>    &#39;limit&#39;:&#39;100&#39;,<br>    &#39;convert&#39;:&#39;USD&#39;<br>    }<br>    headers = {<br>    &#39;Accepts&#39;: &#39;application/json&#39;,<br>    &#39;X-CMC_PRO_API_KEY&#39;: &#39;[API KEY]&#39;,<br>    }<br><br>    session = Session()<br>    session.headers.update(headers)<br><br>    try:<br>        response = session.get(url, params=parameters)<br>        data = json.loads(response.text)<br>    except (ConnectionError, Timeout, TooManyRedirects) as e:<br>        print(e)<br>    <br>    # Normalizes JSON data to dataframe<br>    df = pd.json_normalize(data[&#39;data&#39;]) <br><br>    # Adds timestamp column to API data pull<br>    df[&#39;timestamp&#39;] = pd.to_datetime(&#39;now&#39;) <br><br>    ## Saving Dataframe as CSV File<br>    # Get the directory where the script is located<br>    script_directory = os.path.dirname(os.path.abspath(__file__))<br><br>    # Construct the file path for saving the CSV file in the same directory as the script<br>    file_path = os.path.join(script_directory, &#39;CoinMarketCapTop100Crypto.csv&#39;)<br><br>    # Save the DataFrame to a CSV file<br>    df.to_csv(file_path, index=False)<br><br>    print(f&#39;Data saved to {file_path}&#39;)<br><br>if __name__ == &quot;__main__&quot;:<br>    fetch_data()</pre><p>This is the code for loading the data to Postgres.</p><pre># Libraries<br>import pandas as pd<br><br>import psycopg2<br>from psycopg2 import sql<br><br>import csv<br>from io import StringIO<br>import os<br><br>import io<br><br>def load_data_postgres_docker():<br>    # Set the working directory to the directory of the script<br>    os.chdir(os.path.dirname(os.path.abspath(__file__)))<br><br>    # Database connection parameters for the PostgreSQL service within Docker<br>    dbname = &#39;airflow&#39;  # Database name<br>    user = &#39;airflow&#39;    # Username<br>    password = &#39;airflow&#39;  # Password<br>    host = &#39;airflow_project-postgres-1&#39;   # Docker service name<br>    port = &#39;5432&#39;       # PostgreSQL default port<br><br>    # Connect to the PostgreSQL database<br>    conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)<br><br>    # Create a cursor object<br>    cur = conn.cursor()<br><br>    # Define the schema and table name<br>    schema = &#39;staging&#39;<br>    table_name = &#39;coinmarketcap_top_100_crypto&#39;<br><br>    # Define the path to the CSV file<br>    csv_file = &#39;CoinMarketCapTop100Crypto.csv&#39;<br><br>    # Open the CSV file to read the header and determine the column names and data types<br>    with open(csv_file, &#39;r&#39;) as f:<br>        # Read the header<br>        header = next(csv.reader(f))<br><br>    # Create the staging schema if it does not exist<br>    create_schema_sql = f&quot;CREATE SCHEMA IF NOT EXISTS {schema};&quot;<br>    cur.execute(create_schema_sql)<br><br>    # Create the staging table with columns matching the CSV file schema<br>    create_table_sql = f&quot;CREATE TABLE IF NOT EXISTS {schema}.{table_name} (&quot;<br>    create_table_sql += &#39;, &#39;.join([f&#39;&quot;id&quot; INT&#39;])<br>    create_table_sql += &quot;);&quot;<br>    cur.execute(create_table_sql)<br><br>    # Truncate the table<br>    truncate_sql = f&quot;TRUNCATE TABLE {schema}.{table_name};&quot;<br>    cur.execute(truncate_sql)<br><br>    # Open the CSV file to read only the first column<br>    with open(csv_file, &#39;r&#39;, newline=&#39;&#39;) as f:<br>        reader = csv.reader(f)<br>        next(reader)  # Skip the header row<br>        first_column_data = [row[0] for row in reader]<br><br>    # Create a string with each value in the first column separated by newline<br>    first_column_data_str = &#39;\n&#39;.join(first_column_data)<br><br>    # Create a file-like object from the CSV data string using io.StringIO<br>    first_column_data_file = io.StringIO(first_column_data_str)<br><br>    # Use psycopg2&#39;s copy_expert method to copy data from the first column to the table<br>    copy_sql = sql.SQL(&quot;&quot;&quot;<br>    COPY {}.{}<br>    FROM STDIN WITH CSV HEADER<br>    DELIMITER as &#39;,&#39;<br>    &quot;&quot;&quot;).format(sql.Identifier(schema), sql.Identifier(table_name))<br>    cur.copy_expert(sql=copy_sql, file=first_column_data_file)<br><br>    # Commit the transaction<br>    conn.commit()<br><br>    # Close the cursor and the connection<br>    cur.close()<br>    conn.close()<br><br>    print(&#39;Load successful&#39;)<br><br>if __name__ == &quot;__main__&quot;:<br>    load_data_postgres_docker()</pre><p>These are stored in a “script” folder inside the “dags” folder in the Airflow project directory.</p><h4><strong>Airflow Web Server</strong></h4><p>Once all the files are inside the Airflow project directory, we can now run the Docker application. To do this, just write the code below in a terminal in the Airflow project directory.</p><pre>docker-compose up -d</pre><p>This will create all the containers for the services we specified in our Docker Compose file. We can see the containers running in the Docker Desktop application, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*d6spc5h-FJkY3JL4Vg6FPw.png" /><figcaption>Docker Containers</figcaption></figure><p>After creating the containers, we can now access the Airflow webserver to be able to test out our DAG. To do this, just go to <a href="http://localhost:8080/">http://localhost:8080/</a> and this should open the Airflow webserver. This is what the Airflow webserver should look like.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*9xXtfjXjjFXhN3T84sXvsA.png" /><figcaption>Airflow Webserver UI</figcaption></figure><p>We should be able to test out the DAG by clicking on it, and the interface below should show up. Just click the “Trigger DAG” to run the DAG once to see if it will work.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*BTREfitjFtdnApwnyB5dNw.png" /><figcaption>fetch_and_load_data DAG</figcaption></figure><p>If there are no errors in the DAG run, the latest data from the graph below should be all green, indicating that there is no error.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/937/1*AGZ9mSOLkjd0XMYuvRwlXQ.png" /><figcaption>DAG Run Status</figcaption></figure><p>Based on the latest run, there seems to be no error on the DAG, which means all the tasks ran without any error.</p><h4><strong>Results</strong></h4><p>To check if the data was loaded to the PostgreSQL database in the container we created, we can use PSQL, a command-line interface (CLI) tool used to interact with the PostgreSQL databases.</p><p>To do this, we first need to access the postgres container in the CLI. We first need to find the container ID of our postgres container using the script below.</p><pre>docker ps</pre><p>After running the command in the terminal, this should pop up showing the container IDs of every container currently running.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*YoDNLOR9atmhw7eSd_E1xQ.png" /><figcaption>Container IDs of Docker Containers</figcaption></figure><p>After getting the container ID for the Postgre container, we should now be able to access the postgres container, and use PSQL to check the table. To do this, use the script below. The <strong>-U</strong> and <strong>-d</strong> in the code refers to the username and database, respectively. We stated in our Docker Compose file that the values for both of these is “airflow.</p><pre>docker exec -it 0d1a63e5e816 psql -U airflow -d airflow</pre><p>After running that code, it should show the terminal with the PSQL interface, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*5K3sKACptBz-KLo7WLesIw.png" /><figcaption>Postgres Container</figcaption></figure><p>We just have to write the script to check the table that we loaded the data into. The schema, and table were created beforehand. I setup the table to only have one column called “id” with the INT data type, since this is only for demonstration purposes. The schema and table name can be seen in the script below that will be used to check the table.</p><pre>SELECT * FROM staging.coinmarketcap_top_100_crypto;</pre><p>Lastly, after running the script, it should show the table with the “id” column having values coming from the CSV file that was fetched in the first task.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/725/1*SStQsS8fKM3POT2g9pzxpQ.png" /><figcaption>coinmarketcap_top_100_crypto Table</figcaption></figure><p>We can see in the image above that there are values in the “id” column, which means that the orchetstrator worked.</p><h3><strong>Conclusion</strong></h3><p>Working with Airflow on Windows has its challenges compared to using a Unix-based system like a Macbook. However, the experience did provide us with the valuable opportunity of learning both Docker and Airflow.</p><p>Docker allows users to isolate their applications from one another, similar to a virtual machine (VM), but it is a lot more lightweight and simpler to use compared to a VM.</p><p>Airflow, on the other hand, is an industry standard when it comes to orchestrators even up to this day. It still has widespread adoption across companies running complex data pipelines. Despite newer and simpler orchestrators entering the scene, Airflow still has strong support due to its reliability and versatility in production environments.</p><p>Mastering Docker and Airflow is essential for gaining insight into industry-standard tools. The knowledge not only enhances proficiency in modern data engineering but also prepares practitioners to navigate evolving technology landscapes effectively.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=c7807a6528e1" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[What is MongoDB?]]></title>
            <link>https://blog.devgenius.io/what-is-mongodb-80c655eead01?source=rss-282fff91c231------2</link>
            <guid isPermaLink="false">https://medium.com/p/80c655eead01</guid>
            <category><![CDATA[data]]></category>
            <category><![CDATA[nosql]]></category>
            <category><![CDATA[mongodb]]></category>
            <category><![CDATA[database]]></category>
            <dc:creator><![CDATA[jmregs]]></dc:creator>
            <pubDate>Tue, 21 May 2024 05:35:51 GMT</pubDate>
            <atom:updated>2024-06-01T14:41:01.165Z</atom:updated>
            <content:encoded><![CDATA[<p>MongoDB is a NoSQL database designed to handle huge amounts of unstructured or semi-structured data. Since it is a NoSQL database, it means that it is schemaless, compared to the rigid schema design found in traditional relational databases. In MongoDB’s case, the data is stored in JSON-like documents. Each document can have different fields, and the data structure can vary from document to document.</p><p><strong>Why use a NoSQL Database?</strong></p><p>The main reason people use a NoSQL database is to take advantage of its schemaless nature. Since the database does not require a rigid schema to be setup before being used, companies can quickly use it to store their unstructured data. This allows it to be agile, since if you have a new feature that will add a new field to your data, you can still immediately use a NoSQL database since it is schemaless. There is no downtime in your development.</p><p>This is very useful for companies that have data that are constantly changing like social media companies (Facebook, Twitter, etc.), since they constantly tweak and add new features to their applications.</p><p>The other big reason people use NoSQL databases is its scalability. NoSQL databases like MongoDB are designed to scale out data through sharding (distributing data through multiple servers). This is crucial for companies that constantly store and add huge amounts of data everyday like Facebook, Instagram, Netflix, etc.</p><h3><strong>Using MongoDB</strong></h3><p>For this, I’ll use a NoSQL database called MongoDB to show how it works. I’ll setup a MongoDB Atlas cluster, and load it with sample data.</p><h4><strong>Setting up MongoDB Atlas</strong></h4><p>MongoDB Atlas is the fully managed cloud database service that allows users to use MongoDB instances as clusters. It frees up the users in handling other database tasks, such as scaling, patching, and others.</p><p>To get started, we will first need to create a free account in MongoDB Atlas, and create an organization and project for our database, as seen below. Organizations and projects are just MongoDB Atlas’ way of organizing the users and resources for each task. Organizations are the highest resource hierarchy. Within an organization, it can have different projects, and you can freely assign, which user will have access to which projects.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*VbNyrivWnyz5BjaBkVUPDA.png" /><figcaption>MongoDB Atlas Organization and Project</figcaption></figure><p>In our case, I created a project named “Sample Project” in the organization I made. Click on the project and go to the database section. You should be able to see the screen below. Click the “Build a Cluster” button.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*FwRrhYuhgona8IqYi8j73A.png" /><figcaption>MongoDB Atlas Project UI</figcaption></figure><p>It will transfer you to the cluster configuration section, as seen below. MongoDB Atlas has a free cluster configuration setup called M0. We will choose that configuration for our sample project. I named the cluster “SampleProjectCluster”. After that, just click the “Create Deployment” button, and it should start the cluster creation process. This usually takes a few minutes.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*v123TwWZda0985wavMZbOw.png" /><figcaption>Cluster Configuration Setup</figcaption></figure><p>After the cluster has been created, we can now load our sample data.</p><h3><strong>Loading Data to MongoDB Cluster</strong></h3><p>After creating and setting up our cluster from earlier, there should be a “Connect” button in the cluster. Click this button.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*EJqpj73O0H6548uAShs3qw.png" /><figcaption>MongoDB Atlas Cluster Interface</figcaption></figure><p>This should come up after clicking the “Connect” button. Click “Drivers” next.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1014/1*98GTf2_0e7NPbzEKt9GPfw.png" /><figcaption>Ways of Connecting to MongoDB Atlas</figcaption></figure><p>We should now see the URI connection string. We will use this to connect to the MongoDB Atlas in our code. Each cluster has their own connection string.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1017/1*yPrnskqY_6D-YzOwN0a9Bg.png" /><figcaption>MongoDB Atlas Cluster Connection String</figcaption></figure><p>After getting the connection string, we then need to create a user account to get access to the cluster in our code. To do this, just click “Quickstart” in the Security question, as seen below. Create a username and password for your user, and click “Create User”. This should create the user credentials that we will use to access the MongoDB Atlas cluster in our code.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*NlpBn66tbENboyau8nRoqw.png" /><figcaption>User Database Creation</figcaption></figure><p>Lastly, we need to allow the IP address of the computer we are using to access the MongoDB Atlas cluster when connecting through our code later. We can do this by just scrolling down on the “Quickstart” section, and we will see the the IP Access List section. Just click the “Add My Current IP Address” button to allow your IP address to connect to the MongoDB Atlas cluster.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/937/1*cwcihWZDJ2XqYQQw3c4IPw.png" /><figcaption>IP Access List</figcaption></figure><p>Once that is done, we can now load sample data to our MongoDB Atlas cluster.</p><h3><strong>Loading Data to MongoDB Atlas Cluster</strong></h3><p>There are different ways to load data to MongoDB Atlas, but in our case, we’ll be using PyMongo. This is the official MongoDB driver for Python applications.</p><p>We will just be loading a few sample data to show the capabilities of MongoDB. The code for this can be seen below.</p><pre>import pymongo<br>from pymongo import MongoClient<br><br>def connect_to_mongodb():<br><br>    # MongoDB Atlas connection string<br>    uri = &quot;mongodb+srv://admin:&lt;password&gt;@sampleprojectcluster.ezawixf.mongodb.net/test?retryWrites=true&amp;w=majority&quot;<br>    <br>    # Connect to MongoDB Atlas<br>    client = MongoClient(uri)<br><br>    # Setup the database and collection<br>    db = client[&quot;sampleDB&quot;]<br>    collection = db[&quot;sampleCollection&quot;]<br><br>    # Create sample data<br>    sample_data = [<br>        {&quot;_id&quot;: 0, &quot;name&quot;: &quot;jim&quot;, &quot;score&quot;: 5},<br>        {&quot;_id&quot;: 1, &quot;name&quot;: &quot;zeke&quot;, &quot;score&quot;: 10},<br>        {&quot;_id&quot;: 2, &quot;name&quot;: &quot;david&quot;, &quot;score&quot;: 8}<br>    ]<br><br>    # Insert sample data <br>    collection.insert_many(sample_data)<br>    print(&quot;Data inserted successfully&quot;)<br><br>if __name__ == &quot;__main__&quot;:<br>    connect_to_mongodb()</pre><p>I’ll be explaining the different parts of the code:</p><ul><li>MongoDB Atlas connection string = This is the connection string that we got from the MongoDB Atlas cluster earlier. “admin” is the username, and for the password section, you can just input the password you created for the user you created.</li><li>Connect to MongoDB Atlas = This is the client connection using the connection string.</li><li>Setup the database and collection = The database here is the “sampleDB”, and the collection is “sampleCollection”. Just to quickly explain, a database in MongoDB refers to a container for collections, while a collection is a group of documents. A collection in MongoDB is similar to a table in a relational database.</li><li>Create sample data = This part here is a list of dictionary. Each dictionary here is a document when we insert this to the collection later on.</li><li>Insert sample data = The insert_many() function is used when there are multiple documents to be inserted in the MongoDB Atlas cluster.</li></ul><p>After running the code, the data should now appear in the database and collection we used in our code. This can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*W6iowiHKkYliG7pr7gLPng.png" /></figure><p>Each dictionary in the list of our sample data corresponds to a document in a MongoDB collection.</p><h3><strong>Loading Data with Different Fields</strong></h3><p>As of now, all the documents in our collection have the same fields, but since MongoDB is schemaless, we can add a document that has different fields without doing any prior setup to the collection.</p><p>To show this, I added another document to the collection with additional fields by editing our code earlier. The additional document in the collection can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*aIyVeNwjwNnSMaoQBhIN0g.png" /><figcaption>Additional Document with Additional Fields</figcaption></figure><p>As you can see, there were no issues with adding a document with different fields. This is one of the key features that developers like about a NoSQL database like MongoDB, since they can easily add different types of data without worrying about the database. This is especially useful to companies that constantly change their data. This allows them to focus their time in developing their product without worrying about the database.</p><h3><strong>Conclusion</strong></h3><p>As you can see, a NoSQL database is very useful for people who need a flexible database that can handle different types of unstructured or semi-structured data. It has become a favorite for developers who constantly have changing data, as it allows them to focus less in setting up their database and more on developing their applications.</p><p>Aside from that, it also scales really well compared to traditional databases. It can easily manage millions of documents through sharding, making them ideal for applications that handle massive amounts of data daily, such as social media platforms.</p><p>However, despite these advantages, a NoSQL database also has its drawbacks. One significant issue is data organization as the database scales. Since NoSQL databases do not enforce a rigid schema, it’s common for documents with different fields to coexist, complicating data organization compared to traditional databases.</p><p>In conclusion, a NoSQL database has its pros and cons. Determining whether someone needs a NoSQL database compared to a traditional relational database really depends on a person’s use case. In most cases, a relational database is the obvious choice, but in special cases where there is a need to handle large amounts of unstructured data, a NoSQL database might be a better option.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=80c655eead01" width="1" height="1" alt=""><hr><p><a href="https://blog.devgenius.io/what-is-mongodb-80c655eead01">What is MongoDB?</a> was originally published in <a href="https://blog.devgenius.io">Dev Genius</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Creating an End-to-End Data Pipeline with DBT (Data Build Tool) and CoinMarketCap Data]]></title>
            <link>https://medium.com/@josemarireguyal/creating-an-end-to-end-data-pipeline-with-dbt-data-build-tool-and-coinmarketcap-data-9d616b827597?source=rss-282fff91c231------2</link>
            <guid isPermaLink="false">https://medium.com/p/9d616b827597</guid>
            <category><![CDATA[data-analysis]]></category>
            <category><![CDATA[data]]></category>
            <category><![CDATA[dbt]]></category>
            <category><![CDATA[data-visualization]]></category>
            <category><![CDATA[data-engineering]]></category>
            <dc:creator><![CDATA[jmregs]]></dc:creator>
            <pubDate>Wed, 01 May 2024 02:26:37 GMT</pubDate>
            <atom:updated>2024-05-01T02:26:37.085Z</atom:updated>
            <content:encoded><![CDATA[<p>If you are working in the data industry, chances are that you have heard of DBT (Data Build Tool). It is a tool that handles the transformation part of the ELT (Extract, Load, and Transform) process. But, why exactly is DBT such a popular tool nowadays for data analysts and engineers in handling their pipelines?</p><h3><strong>What is DBT (Data Build Tool)?</strong></h3><p>DBT is an SQL-based transformation tool that enables data analysts and engineers to build, test, document, maintain, and monitor data transformation pipelines directly in their data warehouse. Having DBT as an SQL-based transformation tool is its main draw for most data analysts and engineers, since SQL usually has a lower barrier to entry compared to using Python when it comes to transformations. This allows more people within the industry to quickly learn and use the tool.</p><p>In addition to this, DBT also has features that takes in best practices from software engineering. Some of these features include:</p><ul><li>Version control</li><li>Testing and documentation</li><li>Modularity</li></ul><p>Combining all these features together with DBT being SQL-based, it quickly became a popular hit in the data industry.</p><h3><strong>Creating a Pipeline using DBT</strong></h3><p>After learning what DBT is all about, I will then show a quick project that I did demonstrating how DBT works in a simple data pipeline. I will be using raw data coming from CoinMarketCap’s API.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*FCMknYtoTGEfxYxlWHDGAQ.png" /><figcaption>Data Pipeline Diagram</figcaption></figure><p>The components of the pipeline are:</p><ol><li>Getting data from CoinMarketCap API</li><li>Loading the data to PostgreSQL using Python</li><li>Using DBT to transform the data</li><li>Using PowerBI to visualize the clean data</li></ol><p>I will quickly go through each component of the pipeline to explain the steps for each component</p><h3><strong>Getting Data from CoinMarketCapAPI</strong></h3><p>For this pipeline, I will be using data from CoinMarketCap, a website that tracks cryptocurrency data. To do this, we first need to create a free account in their dev website (<a href="https://coinmarketcap.com/api/">https://coinmarketcap.com/api/</a>) to get an API key for us to use.</p><p>After creating an account and logging in, this is what the homepage will look like. The free account allows us 10,000 requests / month, which is more than enough for this simple project.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*iC8h1WSZFIfXFTZgp2-9Lw.png" /><figcaption>CoinMarketCap API Website Home Page</figcaption></figure><p>I copied the API key, and used it in my Python code that gets the top 100 cryptocurrencies, based on market cap, and saves it locally as a CSV file. The code can be seen below. You can just replace the API key parameter with your API key.</p><pre># Libraries<br>from requests import Request, Session<br>from requests.exceptions import ConnectionError, Timeout, TooManyRedirects<br>import json<br>import os<br><br>import pandas as pd<br><br># Run this script in your terminal to enable pulling data from coinbase API if you are getting data rate limit failure:<br># jupyter notebook --NotebookApp.iopub_data_rate_limit=1e10<br><br># Allows user to see all columns in dataframe<br>pd.set_option(&#39;display.max_columns&#39;, None)<br><br><br># Storing JSON data from CoinMarketCap to a variable. Gets top 100 cryptocurrencies based on market cap.<br>url = &#39;https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest&#39;<br>parameters = {<br>  &#39;start&#39;:&#39;1&#39;,<br>  &#39;limit&#39;:&#39;100&#39;,<br>  &#39;convert&#39;:&#39;USD&#39;<br>}<br>headers = {<br>  &#39;Accepts&#39;: &#39;application/json&#39;,<br>  &#39;X-CMC_PRO_API_KEY&#39;: &#39;(Your_API_Key)&#39;,<br>}<br><br>session = Session()<br>session.headers.update(headers)<br><br>try:<br>  response = session.get(url, params=parameters)<br>  data = json.loads(response.text)<br>except (ConnectionError, Timeout, TooManyRedirects) as e:<br>  print(e)<br><br># Normalizes JSON data to dataframe<br>df = pd.json_normalize(data[&#39;data&#39;]) <br><br># Adds timestamp column to API data pull<br>df[&#39;timestamp&#39;] = pd.to_datetime(&#39;now&#39;) <br><br><br>## Saving Dataframe as CSV File<br># Get the current directory<br>current_directory = os.getcwd()<br><br># Construct the file path for saving the CSV file in the current directory<br>file_path = os.path.join(current_directory, &#39;CoinMarketCapTop100Crypto.csv&#39;)<br><br># Save the DataFrame to a CSV file<br>df.to_csv(file_path, index=False)<br><br>print(f&#39;Data saved to {file_path}&#39;)</pre><p>After running the code, the file should now be saved in the same folder as your code, and saved as ‘CoinMarketCapTop100Crypto.csv’.</p><p>Now that we have the file, the next step is to load the raw data to PostgreSQL.</p><h3><strong>Loading the Data to PostgreSQL</strong></h3><p>Before trying to load the data, I created a “staging” schema and “prod” schema first in PostgreSQL.</p><p>Once we have our schemas ready in PostgreSQL, there are actually multiple ways of loading data to PostgreSQL, but I ended up using the SQLAlchemy library in loading the data.</p><p>The code for loading the data can be seen below. The first step in the code is to create a connection to your PostgreSQL server, by inputting the credentials needed. After that, we just need to put our CSV file into a dataframe, and load it to PostgreSQL in the specified schema.</p><pre># Libraries<br>import pandas as pd<br>from sqlalchemy import create_engine<br><br># Create connection Postgres server<br>engine = create_engine(&quot;postgresql://postgres:admin@127.0.0.1:5432/postgres&quot;, echo=False)<br><br># Store CSV file to dataframe<br>df = pd.read_csv(&quot;CoinMarketCapTop100Crypto.csv&quot;)<br><br># Load CSV file to postgres<br>df.to_sql(&quot;coinmarketcap_top_100_crypto&quot;, con=engine, schema=&quot;staging&quot;, if_exists=&quot;replace&quot;, index=False)</pre><p>After loading the data to PostgreSQL, our raw data should now be in a table on our “staging” schema, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/446/1*5SIBRDZi0E6i_IEggvANXg.png" /><figcaption>Raw Data in the “staging” schema</figcaption></figure><p>We are now ready to transform / clean the data using DBT.</p><h3><strong>Using DBT to Transform the Data</strong></h3><p>I will break this part further down into steps to make it easy to follow.</p><h4>Installing and Initializing our DBT Project</h4><p>To start things off, I installed DBT first using pip. The command for this can be seen below.</p><pre>pip install dbt</pre><p>Once you have DBT installed in your computer, go to your project folder and initialize a DBT project. The code for this can be seen below.</p><pre>dbt init demo_dbt</pre><p>This should create a folder with all the things needed to create our models. Inside that folder, it will have multiple folders, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/312/1*pltmmrOrXh_H2bEnkHWiFQ.png" /><figcaption>demo_dbt Folder</figcaption></figure><h4>Setting up profiles.yml File</h4><p>The first thing that we will do after initializing our project is to create our profiles.yml file. This file contains the connection information to the database we are using for the project, which is PostgreSQL in our case. This is what my profiles.yml looks like after setting it up.</p><pre>demo_dbt:<br>  target: prod<br>  outputs:<br>    dev:<br>      type: postgres<br>      host: 127.0.0.1<br>      port: 5432<br>      user: postgres<br>      pass: admin<br>      dbname: postgres<br>      schema: staging<br>    prod:<br>      type: postgres<br>      host: 127.0.0.1<br>      port: 5432<br>      user: postgres<br>      pass: admin<br>      dbname: postgres<br>      schema: prod</pre><p>I will explain each part of the profiles.yml code:</p><ul><li>demo_dbt = name of the profile. There can be multiple profiles within a single profiles.yml file. Each profile could be a different connection to different databases. In our case, since we’re only using PostgreSQL, we only have one profile.</li><li>target = default target when running our dbt commands. It is currently set to “prod”, which means by default, it will target the “prod” output when we run dbt commands.</li><li>dev, prod = these are the 2 output configurations that we have. For each output, you need to put the connection details to the database you are using. In our case, the connection details of the 2 outputs are basically the same, except for the schema part. The “dev” output refers to the “staging” schema, while the “prod” output refers to the “prod” schema.</li></ul><p>The profiles.yml file will tell DBT which database to connect to, and which schemas to use for the models we will create later.</p><p>Right now, I have the profiles.yml file in the project folder, but DBT will actually look for it in the default directory. The default directory can be seen below.</p><pre>C:\Users\(Your username in the computer)\.dbt</pre><p>There is a way to configure DBT to look for the profiles.yml file in the project folder, but in our case, I just decided to put a copy of the profiles.yml file in the default DBT directory.</p><h4>Creating the DBT Model</h4><p>Before we create our DBT model, I’ll first explain what a DBT model is. A DBT model is basically just an SQL script to create transformations, aggregations, or calculations. The resulting output could then be stored as a table, view, or temporary tables. DBT models are usually stored in the “model” directory in our project folder.</p><p>For the DBT model to clean the raw data and load it into our “prod” schema, the SQL script can be seen below. The config setup at the start of the code is basically telling DBT to store the resulting output as a table. As for the rest of the SQL script, the transformations are:</p><ol><li>Only selecting certain columns</li><li>Limiting the data to the top 50 cryptocurrencies based on market cap only</li><li>Changing data type of dates from “text” to “timestamp”</li><li>Renaming columns</li></ol><pre>{{ config(materialized=&#39;table&#39;) }}<br><br>SELECT <br> id<br> , name<br> , symbol<br> , num_market_pairs<br> , CAST(date_added AS timestamp) AS date_added<br> , max_supply<br> , circulating_supply<br> , total_supply<br> , infinite_supply<br> , CAST(last_updated AS timestamp) AS last_updated<br> , &quot;quote.USD.price&quot; AS USD_price<br> , &quot;quote.USD.volume_24h&quot; AS USD_volume_24h<br> , &quot;quote.USD.volume_change_24h&quot; AS USD_volume_change_24h<br> , &quot;quote.USD.percent_change_24h&quot; AS USD_percent_change_24h<br> , &quot;quote.USD.percent_change_7d&quot; AS USD_percent_change_7d<br> , &quot;quote.USD.percent_change_30d&quot; AS USD_percent_change_30d<br> , &quot;quote.USD.market_cap&quot; AS USD_market_cap<br> , &quot;quote.USD.market_cap_dominance&quot; AS USD_market_cap_dominance<br> , &quot;quote.USD.fully_diluted_market_cap&quot; AS USD_fully_diluted_market_cap<br> , &quot;platform.id&quot; AS platform_id<br> , &quot;platform.name&quot; AS platform_name<br> , &quot;platform.symbol&quot; AS platform_symbol<br> , &quot;platform.token_address&quot; AS platform_token_address<br> , CAST(&quot;timestamp&quot; AS timestamp) AS timestamp<br>FROM staging.coinmarketcap_top_100_crypto<br>ORDER BY &quot;quote.USD.market_cap&quot; DESC<br>LIMIT 50</pre><p>The SQL file should be stored in the “model” directory within our project folder, as seen below. In our case, I created another folder within “models” folder called “coinmarketcap_model”, and stored the SQL file there.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/327/1*W68cC1_sxrD7E7-WcDJCvw.png" /></figure><h4>Testing the Model</h4><p>After creating the model, it is usually advisable to create a schema.yml file inside your model folder. It acts as documentation for your model. The schema.yml file for our model can be seen below.</p><pre>version: 2<br><br>models:<br>  - name: coinmarketcap_clean<br>    description: &quot;Cleans the data and transfers it to prod.&quot;<br>    columns:<br>      - name: id<br>        description: &quot;The primary key for this table&quot;<br>        tests:<br>          - unique<br>          - not_null<br>    meta:<br>      schema: prod  # Specify the production schema for this model</pre><p>I’ll explain each part:</p><ol><li>name = this refers to the model name.</li><li>description = description of model.</li><li>columns = name of each column. You can put a description of each column here, so it can act as a data dictionary. In our case, I only put the “id” column for our tests.</li><li>tests = this is a way to test our models in DBT before running them. These are like unit tests for your models. I used 2 tests that are built-in within DBT, which is unique and not null, for the “id” column. You can build your own customized tests, if needed.</li><li>meta = specify which schema on the model will run on.</li></ol><p>Since there are tests in the schema.yml file, you can run them by running the script below. This will run all the tests within your project folder, if you have multiple models and tests. Since we only have one model, it will only run that test.</p><pre>dbt test</pre><p>After running the test, it would show if there is an error or not. In our case, the tests show that all the values of the “id” column are unique and not null.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*b7GGtva0NfYSLsohm3DPng.png" /><figcaption>DBT Model Test Results</figcaption></figure><h4>Running the Model</h4><p>After testing our model, we can now run the actual model. This will transform the raw data from our “staging” schema, and load this into our “prod” schema.</p><p>To run our model, simply run the script below. This will run all the models in your project folder. Since we only have one model in our project folder, it will only run that model.</p><pre>dbt run</pre><p>This is what it should look like if there are no errors after running the model.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*dF3h4nD1Wavlc8RimI0tgA.png" /><figcaption>DBT Model Run</figcaption></figure><p>The transformed data should show up in the “prod” schema, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*m61OlwNyjOslDgfHHRhY7g.png" /><figcaption>Transformed Data in “prod” schema</figcaption></figure><p>Now that we have the clean data, we can now do a simple dashboard to visualize our data as the final step.</p><h3>Using PowerBI to Visualize the Clean Data</h3><p>For this step, I connected PowerBI to PostgreSQL to be able to use the clean data. After creating and designing the dashboard, the final output can be seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*2gzUAgUIE93bT7-bIolxfQ.png" /><figcaption>CoinMarketCap PowerBI Dashboard</figcaption></figure><p>Any data visualization tool can work for this step. This will depend on your preference.</p><h3><strong>Conclusion</strong></h3><p>In conclusion, DBT emerges as a highly efficient solution in terms of data transformations. The hardest part of the project was actually getting the raw data, and loading it to the data warehouse. Once the raw data was loaded into the data warehouse, the transformation can be done in the data warehouse itself using DBT. In addition to being able to use SQL directly for the transformations, DBT also has the added features of version control, testing, and modularity, which allows the users to make sure that their models are working. With the transformed data readily available, users have the flexibility to perform ad hoc queries, visualize insights, or integrate cleaned data with other datasets as needed. DBT allows users to streamline the transformation process to allow them to be more efficient in creating their data pipelines.</p><p>Here is the GitHub repository link for the project if you want to check it out: <a href="https://github.com/jmreguyal/coinmarketcap_dbt_data_pipeline">https://github.com/jmreguyal/coinmarketcap_dbt_data_pipeline</a></p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=9d616b827597" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Creating a Basic ETL Pipeline with AWS S3 and Glue]]></title>
            <link>https://medium.com/@josemarireguyal/creating-a-basic-etl-pipeline-with-aws-s3-and-glue-b35d3fb065fa?source=rss-282fff91c231------2</link>
            <guid isPermaLink="false">https://medium.com/p/b35d3fb065fa</guid>
            <category><![CDATA[aws]]></category>
            <category><![CDATA[aws-s3]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[etl]]></category>
            <category><![CDATA[aws-glue]]></category>
            <dc:creator><![CDATA[jmregs]]></dc:creator>
            <pubDate>Mon, 22 Apr 2024 02:30:46 GMT</pubDate>
            <atom:updated>2024-04-22T02:43:37.781Z</atom:updated>
            <content:encoded><![CDATA[<p>I have been studying Amazon Web Services (AWS) for a while now, but I have not actually used it yet in any personal project as of now. To bridge this gap, I decided to create a project that will allow me to use two commonly used tools in AWS to create a simple ETL (Extract, Transform, and Load) pipeline.</p><p>This simple ETL project will make use of 2 commonly used tools in AWS. These tools are:</p><ol><li>) AWS S3 (Simple Storage Service)</li><li>) AWS Glue</li></ol><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*HjRZZnzL2wwVYy4Cpzy0bg.png" /><figcaption>Project Diagram</figcaption></figure><p>The project will make use of AWS S3 to store the file we will use to transform using AWS Glue. After doing the transformation, the clean data will be sent and loaded back to AWS S3.</p><p>I will explain the steps I did in each tool to create the ETL pipeline project.</p><h3><strong>Creating User with Policies</strong></h3><p>Before we get started, I first created an IAM user with the necessary policies needed to create this project.</p><p>The policies used are:</p><ul><li>AmazonS3FullAccess</li><li>AWSGlueConsoleFullAccess</li></ul><figure><img alt="" src="https://cdn-images-1.medium.com/max/727/1*8TQJEkoqjSJovKSckZA8Kw.png" /><figcaption>Policies Needed for the Project</figcaption></figure><p>Once the policies are set up for the IAM user to be used, we can now start with the project.</p><h3><strong>AWS S3</strong></h3><p>I will be using a Netflix dataset that I got from Kaggle. Here is the link if you want to see or use it for your own project: <a href="https://www.kaggle.com/datasets/shivamb/netflix-shows?resource=download">Netflix Movies and TV Shows (kaggle.com)</a></p><p>After getting the dataset, I created a bucket with the name ‘bucket-data-pipeline’, as seen below.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*ZdN_rxGTd7XK3qCvrBTyDA.png" /><figcaption>S3 Bucket Used in the Project</figcaption></figure><p>After that, I created a folder inside the bucket named “data”.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*ZknKAif1tKoi-2HRHvA1-g.png" /><figcaption>“data” Folder</figcaption></figure><p>Inside the “data” folder, I created two more folders named “clean” and “raw”. There are two folders, since the CSV file will first be put in the “raw” folder, and after doing some transformations, the clean version of the file will be placed in the “clean” folder.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*1Z8Yt6UGRaf9_NRajiYlLg.png" /><figcaption>“clean” and “raw” Folders</figcaption></figure><p>Finally, I uploaded the ‘netflix_titles.csv’ file into the “raw” folder. I just used the default settings when I uploaded the CSV file.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Iwny310F9NwgjnBUGEz5Nw.png" /><figcaption>CSV File</figcaption></figure><h3><strong>AWS Glue</strong></h3><p>Now that we have the file in our S3 bucket, it’s time to use AWS Glue to be able to transform our data, and to load the clean data back to S3.</p><p>The first step that we will do is to create a database in AWS glue. This is where we’ll store the table that will contain the metadata of our Netflix CSV file.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*ohby8iU1SeLj5ROBgW4Lkg.png" /><figcaption>Netflix Database</figcaption></figure><p>After that, we then need to create a crawler. This will “crawl” through the S3 bucket that we specify in order to get the metadata and store it in the Glue data catalog. We will use this metadata later during the transformation process.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*_Ho064lEkgYg29WwXswB_w.png" /><figcaption>Netflix Data Crawler</figcaption></figure><p>Make sure to associate an IAM role to the crawler to give it access to the S3 bucket, and to specify the database we created earlier. The crawler will create a table inside the database containing the metadata of our file.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*QRWC-COlX1CviliZszXyMw.png" /><figcaption>IAM Role Associated</figcaption></figure><p>After running the crawler, there should now be a table containing the metadata of the file we created.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*OFExofWxyQZPAhTyxYKe2g.png" /><figcaption>Netflix Data Metadata</figcaption></figure><p>We can now create a Glue notebook that will handle the transformation, and the loading of the clean file back to our S3 bucket. The code that I used can be seen below.</p><pre># Run this cell to set up and start your interactive session.<br>%idle_timeout 2880<br>%glue_version 4.0<br>%worker_type G.1X<br>%number_of_workers 5<br>​<br>import sys<br>from awsglue.transforms import *<br>from awsglue.utils import getResolvedOptions<br>from pyspark.context import SparkContext<br>from awsglue.context import GlueContext<br>from awsglue.job import Job<br>​<br>spark.conf.set(&quot;spark.sql.legacy.timeParserPolicy&quot;, &quot;LEGACY&quot;)<br>​<br>from pyspark.sql.functions import col, trim, to_date<br>  <br>sc = SparkContext.getOrCreate()<br>glueContext = GlueContext(sc)<br>spark = glueContext.spark_session<br>job = Job(glueContext)<br><br># Create a DynamicFrame from the Netflix table in the AWS Glue Data Catalog and display its schema<br>dyf = glueContext.create_dynamic_frame.from_catalog(database=&#39;netflix_data&#39;, table_name=&#39;netflix_titles_csv&#39;)<br>dyf.printSchema()<br><br># Convert the DynamicFrame to a Spark DataFrame and display a sample of the data<br>df = dyf.toDF()<br>df.show()<br><br>## Transformation<br># Transform date_added column to datetime<br>df = df.withColumn(&quot;date_added&quot;, to_date(&quot;date_added&quot;, &quot;MMMM dd, yyyy&quot;))<br>​<br><br># Save Spark Dataframe as CSV file in S3 Bucket<br># Specify the S3 path and filename where you want to save the DataFrame as CSV<br>s3_output_path = &quot;s3://bucket-data-pipeline/data/clean&quot;<br>​<br># Write DataFrame to CSV format in S3 with the specified filename<br>df.write.mode(&quot;overwrite&quot;).csv(s3_output_path, header=True)</pre><p>I got the data by referencing the database, and the table we created earlier in our Glue data catalog.</p><p>For the transformation part, I only changed the data type of the “date_added” column. After transforming the data, the code will also store the clean file back to the “clean” folder in our S3 bucket.</p><p>Once the notebook is created, it can then be run as a job to be able to transform and load the clean file to the S3 bucket. I only ran the job once, but the job can be scheduled to run on specific times.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*0QP3rT2qxFa8_LG2lw0koA.png" /><figcaption>Notebook Job</figcaption></figure><p>After running the job, there should now be a CSV file containg our transformed data in the “clean” folder in our S3 bucket, as seen below. The name of the clean file looks like that, because it seems that saving a Spark dataframe will result in that name format. An improvement can be made by saving the clean dataframe with another format that is not a Spark dataframeto be able to rename it.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*Kldl14gEgoVwTcPnM0fa9Q.png" /><figcaption>Transformed Data in “clean” Folder</figcaption></figure><p>Once the clean file is back to our S3 bucket, you can now do whatever you want with the file, such as loading it into an RDBMS, querying directly straight in S3, or moving the file to another storage service.</p><h3><strong>Conclusion</strong></h3><p>To summarize, here are the things that I have done in the project:</p><ul><li>Get a dataset (CSV file) from Kaggle to use for the project.</li><li>Create an S3 bucket.</li><li>Store the raw CSV file to the S3 bucket.</li><li>Create a Glue notebook to transform the raw CSV file</li><li>Load the clean CSV file back to the S3 bucket</li></ul><p>Moving forward, there are still a lot of things that can be done to further improve the project. Here are some of the ideas I have for future improvements:</p><ul><li>Query the clean data using AWS Redshift or other RDBMS.</li><li>Store multiple CSV files in the S3 bucket, and try to transform and combine these files to one or multiple tables.</li><li>Use an orchestrator to automatically schedule getting file extracts, and running the jobs.</li></ul><p>As you can see, there are numerous opportunities for enhancing this basic ETL pipeline further. This pipeline serves as a foundational reference for learning the fundamentals of two widely-used AWS tools: AWS S3 and AWS Glue.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=b35d3fb065fa" width="1" height="1" alt="">]]></content:encoded>
        </item>
    </channel>
</rss>