Creating a Serverless Data Pipeline in GCP

Daniel Zagales
Jun 18 · 6 min read

I started my career surrounded by a world where data moved via ETL and to get started was not an easy thing because the barrier to entry was so high, licensing costs, configuring an environment, and not a whole lot of accessible training.

However today I lead the Data Services Practice at Pandera, and am surrounded by a great number of resources and opportunities to continue learning how to do things with data, both in the traditional ETL space, but also in cloud platforms that are redefining data movement.

In my last post I talked about data quality, and the reason that came up was because I took up an opportunity to learn a little more about moving data through the GCP platform. I had worked on a couple other GCP data projects, but never where I’ve been a developer. So naturally I jumped on the chance to get my hands dirty.

A lot of this journey has been not really knowing what the final solution was going to be, or how I was going to get there. I knew the general end goal is to be able to pull data out of noCRM to have better discussions about forecasting and resource allocation. So I started at the beginning of that thought, “pull data out of noCRM”, if we did not have data the rest would not be able to continue.

noCRMs api documentation is good enough to get going pretty quick, you can check it out here:

https://youdontneedacrm.com/api

So following the documentation, I opened Postman and validated my api key, query structure, and that things just generally worked. Sure enough I was pulling data right away.

I knew I was going to inevitably land this data into GCP somewhere, so I started to put thought into how I wanted to take that request data and land it, and came up with this as a general starting point:

As you can see above, I decided to go with a Cloud Function to run the extraction from noCRM that is triggered by a HTTP request through Cloud Scheduler, then insert directly into BigQuery. I did this for a couple of reasons:

  1. Enables me to use a language I am familiar with (Python)
  2. Cloud Functions are very simple to set up and schedule
  3. The cost is very low
  4. The whole setup is completely serverless

There are cases for using other configurations in the GCP suite, but for me this made the most sense.

As I mentioned I am familiar enough with Python for it to be my go to language when I code something up. I started developing this locally and as a standard at Pandera when we develop locally we use pipenv. So that is how I initially set up my local dev instance.

I originally broke it up into three functions request, parsing and insert. With the focus being on just attacking the leads endpoint.

Here is my initial commit:

https://bitbucket.org/danielzagales/nocrm_pulls/src/27f78b52bf2c36fc5844f1ad62280ae922de7d56/

The request function is fairly simple using the requests python library.

def pull_leads():     o_leads = requests.get(base_url, headers=headers,     params=params).json()     return(o_leads)

The parsing function is a little more complex. I wanted to do a couple of things with it:

  • Include a field that would later be used to partition the BigQuery

Handle a Data Quality issue with noCRM where TIME field comes in as HH:MM and does not meet the BigQuery format requirement of HH:MM:SS

In doing this I did create another issue, the TIMESTAMP fields weren’t being treated as TIMESTAMP objects anymore so I updated the BigQuery table to be a string instead to keep moving on.

def json_parse(jsonData):     rows = []     record_time = datetime.now().strftime(“%Y-%m-%d”)     for line in jsonData:          if line[‘remind_time’] is not None:               remind_time = line[‘remind_time’]+”:00"          else:               remind_time = line[‘remind_time’]          row = {               “id”: line[‘id’],               “title”: line[‘title’],               “pipeline”: line[‘pipeline’],               “step”: line[‘step’],               “step_id”: line[‘step_id’],               “status”: line[‘status’],               “amount”: line[‘amount’],               “probability”: line[‘probability’],               “currency”: line[‘currency’],               “starred”: line[‘starred’],               “remind_date”: line[‘remind_date’],               “remind_time”: remind_time,               “created_at”: line[‘created_at’],                 “estimated_closing_date”: line[‘estimated_closing_date’],               “updated_at”: line[‘updated_at’],               “description”: line[‘description’],               “html_description”: line[‘html_description’],               “tags”: line[‘tags’],               “created_from”: line[‘created_from’],               “closed_at”: line[‘closed_at’],               “attachment_count”: line[‘attachment_count’],               “created_by_id”: line[‘created_by_id’],               “user_id”: line[‘user_id’],               “client_folder_id”: line[‘client_folder_id’],               “client_folder_name”: line[‘client_folder_name’],               “team_id”: line[‘team_id’],               “team_name”: line[‘team_name’],               “record_time”: record_time,          }          rows.append(row)     return(rows)

Lastly the BigQuery insert. This one is pretty simple too, google’s python library is very well documented and simple.

You can find it here:

https://googleapis.github.io/google-cloud-python/latest/bigquery/index.html

def insert_bq(jsonData):     client = bigquery.Client()     table_ref = client.dataset(dataset_id).table(table_id)     table = client.get_table(table_ref)     errors = client.insert_rows_json(table,jsonData)     print(errors)

This code is directly used in a Cloud Function, that is going to get called once daily.

My second reason for the configuration I chose was simplicity. When talking about simplicity, below is about the extent of setting up a cloud function. The setup screen which consist of sizing, trigger, code, service account, and environmental variables.

So my Cloud Function is now all set up, but to actually get it to go I’m going to trigger it via Cloud Scheduler. Again another easy to set up low cost (free tier for the win) option to do something in the GCP stack. If you are familiar with setting up a cron job the syntax is the same for Cloud Scheduler, and if you’re not here is a little helper https://crontab.guru/. After the frequency is defined, all that’s left to do is give it the end point to hit and we’re off.

At this point I’ve got data flowing from noCRM and into BigQuery on a daily schedule.

I mentioned that one of the reasons I went with this setup is how cheap it is as an option. With the usage I require, I actually fall under the free tier for Cloud Function and Cloud Scheduler, while incurring really low costs for BigQuery!

Cloud Function costs:

Cloud Scheduler costs:

Lastly, this whole setup is completely serverless. Which for me means not having to worry about optimizations, patching, or anything that normally comes along with maintaining my own server.

That’s it for now. In my next post I’ll go into how I optimized the python script and how we are using the data downstream from BigQuery and some considerations I had to make in BigQuery to accommodate.

You can checkout the latest version of the code here in my bitbucket repository:

https://bitbucket.org/danielzagales/nocrm_pulls/src/master/