ETL Pipeline for Extracting Animal Welfare Organization Data — Part Three

Aaron Schlegel
6 min readAug 10, 2021

In the previous posts of the series, we installed PostgreSQL on our local machine and got set up with AWS S3 for storing raw results from the Petfinder.com API. We also became familiar with the Petfinder.com API by investigating the returned data from some sample queries and creating data tables that store the ingested data from the API. Now that we are set up with a database and the Petfinder.com API and AWS credentials, we can create our first data pipeline to extract the animal welfare organization data, transform it as needed, and load it into our data tables. In this post, that is precisely what we will do!

Getting the Animal Welfare Organization Data from the Petfinder.com API

As before in the previous post, we will use the petpy library for working with the Petfinder.com API. To create an authenticated connection to the Petfinder.com API with the petpy library, we first import the Petfinder class. Initialize the class with our access and secret keys from the Petfinder.com API developers page.

from petpy import Petfinder
pf = Petfinder(key=petfinder_api_key,
secret=petfinder_secret_api_key)

To programmatically save the raw JSON data from the API into S3, we also must create a client object using the boto3 library for interacting with AWS services. If boto3 is not already installed, it can be installed with pip.

pip install boto3

We can then create a session and a client by providing the AWS secret and secret access keys obtained in the first post of the series.

s3_session = boto3.Session(
aws_access_key_id=aws_secret_id,
aws_secret_access_key=aws_secret_access_key
)
s3_client = s3_session.client(“s3”)

Let’s now get the U.S. animal welfare organization data from the Petfinder.com API! The API only provides 1,000 requests per day. Therefore we set the number of pages to be returned to 1,000 and the results per page to 100 (max possible results per page) to get the most possible organizations within our given quota.

orgs = pf.organizations(country=’US’, results_per_page=100, pages=1000)
orgs = orgs[‘organizations’]

That’s all there is! With the s3_client object created earlier, we can dump the returned JSON from the API into a `.json` file and upload it to the AWS S3 bucket storing the API data. Below, we dump the dictionary representation of the returned JSON into a string using the [`json`](https://docs.python.org/3/library/json.html) standard library and save it to the desired S3 bucket and key prefix (essentially a file path in S3). The current datetime is also prepended to the end of the filename to distinguish when the results were scraped from the Petfinder.com API.

s3_client.put_object(Body=json.dumps(orgs),
Bucket=’pethub-data’, Key=’petfinder/organizations/organizations_{dt}.json’.format(dt=datetime.datetime.now().strftime(‘%Y-%m-%d-%H-%M-%S’)))

With the animal welfare organization data extracted from the API and the results saved in S3, we can now iterate over the array of JSON objects and insert them into the database tables.

Inserting the API Data into Database Tables

In the previous post, we created several data tables to store the animal welfare organization data that we will ingest from the Petfinder.com API. The entity-relationship diagram of the organizations schema is shown below.

Therefore, our data pipeline script will need to get the relevant keys from the API data for each table and make any necessary transformations to the data to be inserted. To accomplish this, we can iterate through each organization in the data and create dumps of the relevant JSON like in the script below.

We get the data we want for each table in the JSON data for each organization using list comprehension. For the others, we extract the nested data with square bracket notation. The JSON is then dumped to a formatted string using the json.dumps() function. Some of the dumped JSON strings have any single quotes replaced with double single quotes to escape the quotes when inserted in the PostgreSQL table.

Once the data has been extracted and transformed as needed, it can be inserted into the destination tables. To insert the data into the tables, we first start a transaction using a context manager with psycopg2, so if there is an error for any reason, the current organization’s data being inserted will be rolled back to its previous state. The final manipulation of the JSON is to transform it into an ‘expanded’ or ‘normalized’ format, which can be handily done by several JSON support functions available in PostgreSQL. Each set of photos and their resolutions are inserted individually in an inner loop for the data to be appropriately manipulated according to our desired data schema. Lastly, the connection opened at the beginning of the script is closed before the script completes.

After the script completes, we can query the tables to investigate the data! A quick query to get the total count of organizations should yield around 10,000 unique rows.

SELECT DISTINCT count(id) 
FROM organizations.organization

We have just successfully created our first data pipeline to get listed animal welfare organizations from Petfinder.com using their API! We are only focused on ingesting data from an API, so we kept the data transformations to a minimum. It is generally recommended to keep any transformations that change the data significantly (aggregations, filters, etc.) for pipelines that update downstream tables meant for analysis and other purposes (dashboarding, machine learning, general analytics, etc.) to keep the data consistent between the source and the internal point of origin. This approach also keeps data pipeline operations more straightforward. Only the data points and tables that need to be updated are changed.

Aside: Upserting (Merging) Data in PostgreSQL

PostgreSQL introduced an ‘UPSERT’ syntax in the 9.5 release to update records that already exist in a table and add any new rows based on a particular constraint. Other SQL providers such as SQL Server generally refer to this as a ‘merge’. By employing an upsert on the organization data, we can add any new organizations in the data while also updating any current organization record in the data. This approach allows us to keep the data updated while also avoiding the creation of duplicates. For example, breaking down the upsert that is used in the `organization` table insert query:

ON CONFLICT ON CONSTRAINT organization_pk 
DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
phone = EXCLUDED.phone,
url = EXCLUDED.url,
website = EXCLUDED.website,
mission_statement = EXCLUDED.mission_statement;

We first specify that when there is a conflict (a row that already exists in the table with the same key) on the primary key (organization_id), we want to update the row in the first two lines. Specifically, the first line:

ON CONFLICT ON CONSTRAINT organization_pk

Is saying, “when a row is added to the table if it has the same constraint value (key, named organization_pk), we want to do something to the row”. In the next line, the conflict_action is an UPDATE statement.

DO UPDATE SET

We could use DO NOTHING instead if we wanted to just skip the row and not make any updates. From there, we specify the columns we want to change or update. In the following block, the table columns for the specific row are updated to be the new values for insertion. The EXCLUDED table is a unique table that contains the values of the data to be inserted.

name = EXCLUDED.name, 
email = EXCLUDED.email,
phone = EXCLUDED.phone,
url = EXCLUDED.url,
website = EXCLUDED.website,
mission_statement = EXCLUDED.mission_statement;

Conclusion

In this post, we built our first data pipeline with Python and PostgreSQL to extract, transform and load animal welfare organization data from the Petfinder.com API into several database tables for later analysis. We also introduced how to programmatically save raw data from the API into AWS S3 and the benefits of using an upsert strategy to maintain data consistency.

Notes

Another pipeline for getting the animals at each organization extracted from the API is also available as a Github Gist. The script is somewhat similar to the pipeline script we developed earlier in the post. Therefore it did not seem worthwhile to have a separate post describing the animals pipeline.

The code demonstrated here was taken from a project I am currently working on, pethub. The project’s goal is to build an analytics-ready database of adoptable animal data and other information obtained and merged from various sources.

References

https://www.2ndquadrant.com/documents/pg_doc/functions-json.html
https://www.postgresql.org/docs/9.1/sql-update.html
https://wiki.postgresql.org/wiki/UPSERT#UPSERT_as_implemented_in_practice

--

--

Aaron Schlegel
Aaron Schlegel

Written by Aaron Schlegel

Seeker of knowledge, truth pilgrim, builder of things, science fiction lover, math and stats aficionado, data scientist. https://aaronschlegel.me

Responses (1)