JIRA API — Data Ingestion into Databricks with Python
Recently, I worked on creating data ingestion pipeline from JIRA for reporting and data insights on high impact Project Delivery for the Executive and Senior Leadership Teams.
I wanted to share the method I followed and some issues I faced while implementing this API for data ingestion.
- API Token — First, you will need an API Token to be provided for the OAuth token and creating a secure connection with JIRA. Depending on your organization setup and access, you might be able to get your own token or you might have to request a JIRA admin in your organization. Instructions are provided in this link.
- Since my org uses Azure Datalakehouse and Databricks, I will provide scripts that show Databricks specific development. Snowflake data ingestion is pretty similar too.
# Import the required libraries
import requests
from requests.auth import HTTPBasicAuth
import json
import pandas as pd
# URL to Search all issues.
url = f"https://your-jira-instance.com/rest/api/2/search"
# Create a json file or use a secret-keeper/vault for accessing your token
path = '../jira_api_token.json'
def open_json(file):
with open(file, 'r') as f:
infile = json.load(f)
return infile
secret = open_json(path)
# Create an authentication object using registered emailID, and token received.
auth = HTTPBasicAuth("user.name@company.com", secret['token'])
# The Header parameter, should mention, the
# desired format of data.
headers = {
"Accept": "application/json"
}
3. API Pagination —By default, JIRA API returns only 50 lines of data. If there are more than 50 Projects in your org, a boiler plate request would not work. Below is a simple example of default data pull.
query = {
'jql': 'project =PROJECTNAME'
}
# Create a request object with above parameters.
response = requests.request(
"GET",
url,
headers=headers,
auth=auth,
params=query
)
So we need to create a Looping system with startAt parameter in query for the API.
#Create API Pagination
startAt = [*range(0,1000,50)] # I put a max of 1000 pages
list_issues_final = []
for pag_no in startAt:
# Mention the JQL query. Here, all issues of a project fetched as
# no criteria is mentioned.
query = {
'jql': 'project =PROJECTNAME',
'maxResults': 50,
'startAt': pag_no
}
# Create a request object with above parameters.
response = requests.request(
"GET",
url,
headers=headers,
auth=auth,
params=query
)
# Get all project issues,by using the json loads method.
dictProjectIssues = json.loads(response.text)
if dictProjectIssues['issues']==[]: # If no issues, exit the loop
break
else:
for j in range(len(dictProjectIssues['issues'])):
list_issues_final.append(dictProjectIssues['issues'][j])
An alternate approach I found online is below which seems much less complex.
import requests
url = "https://your-jira-instance.com/rest/api/2/search"
querystring = {"jql":"project=PROJECT_NAME","startAt":"0","maxResults":"50"}
headers = {
"Accept": "application/json"
}
while True:
response = requests.request("GET", url, headers=headers,
params=querystring)
if not response.ok:
break
data = response.json()
df = pd.json_normalize(data["issues"])
df.write.format("delta").mode("append").save("/mnt/delta/data")
if data["startAt"] + data["maxResults"] >= data["total"]:
break
querystring["startAt"] += data["maxResults"]
4. Data storage into Datalakehouse — There are many ways in python to get this done. I used a simple for loop function and get the details that I needed. This depends on how your org structures data and is different for different projects within one org.
# Column names dictionary to convert from JIRA to Table. This depends on
# your JIRA nomenclature and how your org uses JIRA
dict_col_names = {"key":
"summary": "Initiative" ,
"customfield_XXXXX.field-151": "Action items/plan" ,
"customfield_XXXXX.field-150": "Action Prime" ,
"customfield_XXXXX.field-165-name": "Action Status" ,
"customfield_XXXXX.field-165": "Action Completion Date" ,
"customfield_XXXXX.id": "ID" }
# Create an empty dataframe
df = pd.DataFrame(columns=dict_col_names.values(), index=[])
# For loop to iterate the data
for issue in list_issues_final:
temp_list=dict.fromkeys(dict_col_names.values())
for key, value in issue.items():
if(key == "key"):
temp_list["Issue key"]=value
elif(key == "fields"):
for k1,v1 in value.items():
for col in list(dict_col_names.keys()):
if(k1==col):
temp_list[dict_col_names[col]]=v1
if(temp_list["Issue key"]!='') and (temp_list["Initiative"]!=''):
df_temp = pd.DataFrame(temp_list, index=[1])
df=pd.concat([df,df_temp], ignore_index=True)
5. Save to Datalakehouse with PySpark —
from pyspark.sql.types import *
df.iteritems = df.items
df_spark = spark.createDataFrame(df)
df_spark.write.mode('overwrite').saveAsTable('databricks_workspace.JIRA_PROJECTNAME_data')
And that is a wrap! Please note that I work in analytics and I am not an expert in Data Engineering. Hence, if you have better and more efficient methods, please mention in the comments.