Building a data pipeline from the Iowa Liquor Sales open database using Socrata Open Data API (SODA)

Harry McKaig
8 min readMar 2, 2023

--

Project Goals

Goal of this project is to build an automated data pipeline to the Iowa State Liquor using Socrata Open Data API (SODA)

  • Database Design: develop a lightweight Relational Database with reference tables
  • Data Preprocess: using Python and Pandas, create functions and other tools to drop duplicates, create subcategories, coerce data types and convert spatial coordinates to numeric latitude and longitude for mapping.
  • Map to Database: load a PostgreSQL back end using SQLAlchemy (Object Relational Mapper or ORM) to create ODMS for querying.
  • Automate above API script in Python
  • Create live connection (or export) to Tableau (or other visualization tool)
  • Generate dashboards for Exploratory Data Analysis (EDA) and Insights around Iowa Liquor Sales. Link to Tableau Insights

Helpful Notes

  • If you have any SQL experience, you’re in luck. Socrata Open Data API has many familiar filters, functions and execution order. You can design and write queries directly in the URL. Documentation Here:
  • Recommend signing up for an account and generating personal App token. With app token, your application is guaranteed access to it’s own pool of requests. No limits and no throttle (within reason…be reasonable!).
  • In total, there are 24 columns in the dataset, with nearly 25MM rows dating back to 2012. Link to Iowa Liquor Sales dataset and dictionary.
  • Pricing Categories: Used Nielsen’s category tiers from Value through Ultra Prestige
  • Channels: Defined “Chains” as all stores with 5< locations, exception being Costco. All else classified as “Independent”
  • Link to Customized Script is available in GitHub repository as a .py and .ipynb file

Links to Project Resources

Dataset: What are we capturing?

  • Off Premise Licenses ONLY
  • Liquor Sales (excludes beer, wine and non-alc)
  • Class “E” Retail Alcohol License (LE) — grocery, liquor and convenience stores: Allows for the sale of alcoholic liquor, wine, and beer for off-premises consumption. No sales by the drink. Also allows wholesale sales to on-premises class “B”, “C”, special “C”, “D”, and “F” retail alcohol licensees but must have a TTB Federal Wholesale Basic Permit.

Data Limitations

  • Dataset does not include On-Premise (bars, restaurants or any sale by drink “on” the premise)
  • Does not include Beer and wine for broader pulse on the Beverage Alcohol market.
  • Limited data definitions on Category / Sub-Category Numbers
  • Limited Time Offers (products) and VAP (Value Added Packaging) did not have any unique characteristics and need to be further sorted for category and sub category definitions.
  • Aggregating Ultimate Brand Owner. Example being Crown Royal Apple, aggregating into ownership category of “Crown Royal” brand.

Data Process: Step-by-Step

1. Sign Up for Socrata Account and generate API Token:

2. Installs and Imports

pip install python
pip install pandas
pip install numpy

import csv
import requests
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sodapy import Socrata
from datetime import datetime
from sqlalchemy import create_engine
import pgsettings as settings #create your own py file with auth settings
import psycopg2 as psy
from pandas_profiling import ProfileReport

3. Basic API Authentication and Setup

Note: API key is inserted in query url below to exceed non-authorization 5,000 limit

#PULL IN API KEY
API_TOKEN = "Enter your Personal API KEY"

#connect to Socrata
socrata_domain = 'data.iowa.gov'
socrata_dataset_endpoint = 'm3tr-qhgy'
client = Socrata(socrata_domain, app_token=API_TOKEN)

#pull data from Socrata API for Iowa Liquor, use limit to throttle
results = client.get(socrata_dataset_endpoint, limit=5000)

#Convert to pandas dataframe
df = pd.DataFrame(results)

4. Determine File Size

month_invoice_data = []
url_input = 'https://data.iowa.gov/resource/m3tr-qhgy.json?$limit=500000&$offset=0&$select=date_extract_y(date) as Year,date_extract_m(date) as Month,count(invoice_line_no)&$group=Year,Month'

#QUERY_API
response = requests.get(url_input) #grabs URL from loop
data = response.json() #converts to json file
month_invoice_data_grab.extend(data) #converts to dataframe

#CONVERT TO DATA FRAME
month_invoice_data = pd.DataFrame(month_invoice_data_grab)

#CREATE COLUMN TO CHANGE INVOICE COUNT TO NUMERIC
month_invoice_data['count_invoice'] = pd.to_numeric(month_invoice_data['count_invoice_line_no'])
month_invoice_data.info()

## will take 20-30 seconds to grab

5. Adjust Data to capture complete full years

  • This was authored in February 2023
  • Dropped January 2023
#SUM OF ALL INVOICES - Horizontal Analysis
remove_2023 = month_invoice_data['Year']!='2023'
month_invoice_data.groupby('Year')['count_invoice'].sum().plot(kind='bar')

#Plot and save image
plt.ylabel('Millions of Rows')
plt.yticks()
plt.title('Total Size of dataset = 25MM Ave = 213k per month (exc: 2023)', fontsize=10);
plt.savefig('iowa_liquor_db_size_year.png')

6. Total Dataset Size

created using matplotlib

7. Database Design:

  • In total, there are 24 columns in the dataset, with nearly 25MM rows dating back to 2012. Data Dictionary and Link to dataset: https://data.iowa.gov/Sales-Distribution/Iowa-Liquor-Sales/m3tr-qhgy
  • Break up the table into 5 separate relational tables. Us Orders as primary table (largest and heaviest) and create reference tables for stores, vendors (suppliers), products & pricing / sales.

8. Table Creation: Preprocessing and Categorization

  • Creating Orders Table: Used Nielsen’s category tiers from Value through Ultra Prestige
#ORDERS TABLE - using API
#orders_grab = ['invoice_line_no', 'date', 'store','vendor_no','itemno', state_bottle_retail, sale_bottles, sale_dollars, sale_liters, sale_gallons]
url_orders = f"https://data.iowa.gov/resource/m3tr-qhgy.json?$limit=100&$offset=0&$select=invoice_line_no,date,store,vendor_no,itemno,state_bottle_retail,sale_bottles,sale_dollars,sale_liters,sale_gallons&$where=date between '{start_year}-{current_month}-10T00:00:00' and '{current_year}-{current_month}-11T00:00:00'"
orders_response = requests.get(url_orders)
orders_response.raise_for_status()

# To save as Pandas DataFrame for EDA
orders = orders_response.json()
orders = pd.DataFrame(orders)

#Use Nielsen pricing categories
def conv_pricing_category(row):
if float(row['state_bottle_retail']) < 9.99:
return 'Value'
elif float(row['state_bottle_retail']) < 22.49:
return 'Standard'
elif float(row['state_bottle_retail']) < 29.99:
return 'Premium'
elif float(row['state_bottle_retail']) < 44.49:
return 'Super Premium'
elif float(row['state_bottle_retail']) < 99.99:
return 'Ultra Premium'
elif float(row['state_bottle_retail']) < 199.99:
return 'Prestige'
else:
return 'Prestige Plus'

#ADD PRICE CATEGORY COLUMN
orders['price_category'] = orders.apply(conv_pricing_category, axis=1)

#Convert to CSV file to save
orders.to_csv("orders", index=False)
  • Creating Stores Table: Defined “Chains” as all stores with 5< locations, exception being Costco. All else classified as “Independent”
#STORES TABLE - using API
#stores_grab = ['store', 'name', 'address', 'city', 'zipcode', 'store_location', 'county_number', 'county']
url_stores = f"https://data.iowa.gov/resource/m3tr-qhgy.json?$limit=10000000&$offset=0&$select=store,name,address,city,zipcode,store_location,county_number,county&$where=date between '{start_year}-{current_month}-10T00:00:00' and '{current_year}-{current_month}-11T00:00:00'&$group=store,name,address,city,zipcode,store_location,county_number,county"
stores_response = requests.get(url_stores)
stores_response.raise_for_status()
stores = stores_response.json()

#convert to Pandas DataFrame
stores = pd.DataFrame(stores)

# DROP DUPLICATES from stores
stores = stores.drop_duplicates(subset = 'store', keep='first')

#CONVERT COORDINATES: uses index to grab longitude (x) and latitude (y)coordinate
# Socrata geo coordinates are reveresed: https://dev.socrata.com/docs/datatypes/point.html
def conv_long(row):
if type(row['store_location']) == dict:
return row['store_location']['coordinates'][0]
else:
return 0
def conv_lat(row):
if type(row['store_location']) == dict:
return row['store_location']['coordinates'][1]
else:
return 0
stores['longitude'] = stores.apply(conv_long, axis=1)
stores['latitude'] = stores.apply(conv_lat, axis=1)

#CREATE CHANNEL CATEGORY: stores with 5 or more, 35 identified with the exception for Costco = Chain, else Independent
def conv_chain_category(row):
if 'wal-mart' in row['name'].lower():
return 'Walmart'
elif 'big 10' in row['name'].lower():
return 'Big 10'
elif 'bp to go' in row['name'].lower():
return 'BP to Go'
elif 'brew oil' in row['name'].lower():
return 'Brew Oil'
elif "brother's market" in row['name'].lower():
return 'Brothers Market'
elif 'bucky' in row['name'].lower():
return 'Buckeys Express'
elif 'casey' in row['name'].lower():
return 'Caseys'
elif 'circle k' in row['name'].lower():
return 'Circle K'
elif 'costco' in row['name'].lower():
return 'Costco'
elif 'cvs' in row['name'].lower():
return 'CVS'
elif 'fairway' in row['name'].lower():
return 'Fareway Stores'
elif 'hartig' in row['name'].lower():
return 'Hartig'
elif 'hartig' in row['name'].lower():
return 'Hartig'
elif 'hometown' in row['name'].lower():
return 'Hometown Foods'
elif 'hy-vee' in row['name'].lower():
return 'Hy-Vee'
elif 'jet stop' in row['name'].lower():
return 'Jet Stop'
elif 'kimmies' in row['name'].lower():
return 'Kimmies'
elif 'kum & go' in row['name'].lower():
return 'Kum & Go'
elif 'kwik' in row['name'].lower():
return 'Kwik Star'
elif 'liquor toba' in row['name'].lower():
return 'Liquor Tobacco'
elif 'liquor, toba' in row['name'].lower():
return 'Liquor Tobacco'
elif 'main st' in row['name'].lower():
return 'Main Street'
elif 'moes' in row['name'].lower():
return 'Moes Mart'
elif 'new star' in row['name'].lower():
return 'New Star'
elif 'prime mart' in row['name'].lower():
return 'Prime Mart'
elif 'quick trips' in row['name'].lower():
return 'Quick Trip'
elif "sam's club" in row['name'].lower():
return 'Sams Club'
elif "smokin" in row['name'].lower():
return 'Smokin Joes'
elif "target" in row['name'].lower():
return 'Target'
elif "super target" in row['name'].lower():
return 'Target'
elif "the depot" in row['name'].lower():
return 'The Depot'
elif "tobacco" in row['name'].lower():
return 'Tobacco Outlet'
elif "walgreen" in row['name'].lower():
return 'Walgreens'
elif "world liquor" in row['name'].lower():
return 'World Liquor and Tobacco'
elif "yesway" in row['name'].lower():
return 'Yesway'
else:
return 'Independent'

#ADDS CHANNEL COLUMN to dataframe
stores['Channel'] = stores.apply(conv_chain_category, axis=1)

#Adds Topline Channel Category to Column
def chain_indy(row):
if row['Channel'] == 'Independent':
return 'Independent'
else:
return 'Chain'

#ADD PRICE CATEGORY COLUMN
stores['Chain_Indy'] = stores.apply(chain_indy, axis=1)

#Convert to CSV file to save
stores.to_csv("stores", index=False)
  • Creating Products Table:
#PRODUCTS TABLE - use API
#products_grab = ['itemno', 'im_desc', 'category', 'category_name', 'pack']
url_products = f"https://data.iowa.gov/resource/m3tr-qhgy.json?$limit=10000000&$offset=0&$select=itemno,im_desc,category,category_name,pack&$where=date between '{start_year}-{current_month}-10T00:00:00' and '{current_year}-{current_month}-11T00:00:00'&$group=itemno,im_desc,category,category_name,pack"
products_response = requests.get(url_products)
products_response.raise_for_status()
products = products_response.json()

#convert to Pandas DataFrame
products = pd.DataFrame(products)

#DROP DUPLICATES from Products
products = products.drop_duplicates(keep='first')

#Convert to CSV file to save
products.to_csv("products", index=False)
  • Creating Vendors Table: Will require Grouping in visualization
#VENDORS TABLE - using APO
#vendors_grab = ['venor_no, vendor_name, itemno']
#group=vendor_name (there are duplicate vendor_no)
url_vendors = f"https://data.iowa.gov/resource/m3tr-qhgy.json?$limit=10000000&$offset=0&$select=vendor_no,vendor_name,itemno&$where=date between '{start_year}-{current_month}-10T00:00:00' and '{current_year}-{current_month}-11T00:00:00'&$group=vendor_name,vendor_no,itemno"
vendors_response = requests.get(url_vendors)
vendors_response.raise_for_status()
vendors = vendors_response.json()

#convert to Pandas DataFrame
vendors = pd.DataFrame(vendors)

#Drop Duplicates
vendors = vendors.drop_duplicates()

#Convert to CSV file to save
vendors.to_csv("vendors.csv", index=False)
  • Creating Pricing Table
# pricing_grab = ['itemno', 'bottle_volume_ml ', 'state_bottle_cost', 'state_bottle_retail']
url_pricing = f"https://data.iowa.gov/resource/m3tr-qhgy.json?$limit=100000&$offset=0&$select=itemno,bottle_volume_ml,state_bottle_cost,state_bottle_retail&$where=date between '{start_year}-{current_month}-10T00:00:00' and '{current_year}-{current_month}-11T00:00:00'&$group=itemno,bottle_volume_ml,state_bottle_cost,state_bottle_retail"
pricing_response = requests.get(url_pricing)
pricing_response.raise_for_status()
pricing = pricing_response.json()

#convert to Pandas DataFrame
pricing = pd.DataFrame(pricing)

#DROP DUPLICATES from Products
pricing = pricing.drop_duplicates(subset='itemno', keep='first')


#Convert to CSV file to save
pricing.to_csv("pricing", index=False)

9. Clean and Profile the Data using Pandas Profiling

  • Please see Git Repository for Profile Reports
  • Reminder, name your reports in the code
# use and edit the below to create profile reports for each table above
name_profile = ProfileReport(orders, title='ADD TITLE FOR EACH REPORT')
name_profile
ProfileReport.to_file(orders_profile, 'NAME REPORT.html')

10. Use SQLAlchemy ORM to generate PostgreSQL back end

#create engine function
def engine(eng, user, password, address, port, db): #was createengine
engine = create_engine(f'{eng}://{user}:{password}@{address}:{port}/{db}')
return engine.connect()
# reference: f'postgresql://postgres:password@localhost:5434/iowa_liquor')

#load to SQL postgres function
def load_data(connection, table_name, df):
df.to_sql(table_name, connection, if_exists='replace', chunksize=1000000)

#grabs credentials from pgsettings.py (create your own secure credentials)
csv_files_to_load = list_of_final_csv

#Can manually add files or create bucket in above code to pupulate list
csv_files_to_load = []

def load_to_sql():
for file in csv_files_to_load: #takes file name from csv_files_to_load and passes them through
df = pd.read_csv(file, index_col=None)
eng = 'postgresql'
usr = settings.pgadmin_settings['user']
password = settings.pgadmin_settings['password']
host = settings.pgadmin_settings['host']
port = settings.pgadmin_settings['port']
db = settings.pgadmin_settings['db']
connection = engine(eng, usr, password, host, port, db)
table_name = file
load_data(connection, table_name, df)
connection.close()

## TO RUN ORM##
load_to_sql()

Database is now created and ready for import into visualization or other ETL, automation tool.

Other Helpful Resources

--

--