Data preparation with Azure Synapse Analytics

Jonathan Bogerd
8 min readFeb 22, 2022

--

In the previous article we setup a Synapse and Machine Learning workspace. In this article, we will go over the ways on how to get data in the Azure Synapse Analytics workspace and how to transform the data to make it ready for use. First, I will discuss the Integrate tab of Synapse. For those that are familiar with Azure Data Factory, this section will be very easy to follow. Next, SQL is used to create an external table with data present in the storage. After that, we will see how to open data from a linked storage location. Lastly, Microsoft provided datasets will be introduced. For the remaining articles, we will use Microsoft data. Therefore, this will also be the data that we will transform in the last part of this article.

Photo by Luke Chesser on Unsplash

ADF Pipeline

If you click the tab Integrate, you are led to the integrated Azure Data Factory within Synapse. Here you can create pipelines in order to ingest data ad hoc, or scheduled on a certain interval. Data can be retrieved from many sources, by calling an API, copying data from a database or copying a file from an on-premise file store. For more information on the options see this website.

To create a pipeline, click on the plus icon and select Pipeline. A canvas opens on which you can select several options. Under Move & Transform, you can find the copy data option. Here you can select a source, sink and map data according to your requirements. In order to build an end-to-end example, we will use this pipeline functionality to run Azure Synapse Analytics notebooks in the correct order and scheduled with a trigger. Under Synapse you can find the notebook icon. Dragging this to the canvas gives you the option to run a notebook that you have created here. To run the pipeline on a certain schedule, you can create a trigger by selecting Trigger and Create New. Do not forget to publish your finished pipeline.

If you click the plus icon, you can also find the Copy data tool. An interface opens that makes it easier to copy data using Data Factory. Several options here are available on scheduling. For instance, if you want to copy data from the linked Gen2 storage from Azure Machine Learning, select Azure Data Lake Storage Gen2 and select the correct Linked Service. You can then select the correct folder or file and even filter on modification date. For more information on the Copy Data tool, see here.

Create External Table

If your data is already stored within a linked storage or the primary storage location, you can create an external table within Azure Synapse Analytics. In order to do this, select the Data tab and click through until you find the dataset you want to create into an external table. For illustrational purposes, I created a data container in the Synapse Storage and uploaded a csv file to this containing data on the world’s largest cities. In order to access the data, you might need to refresh the linked data sets. To create an external table, click on Linked, select the data container. Then right click the file, New SQL script and create an external table. A popup will open where you can select several options. As the file I uploaded is a csv file, I get the options for the csv file type. If your dataset contains headers, select Infer column names. Select continue, and create a new database and table name. Now you can select two options. Either use the current settings to create the external table, by selecting Automatically or make a manual script. If you want to change parts of the scripts or inspect it, click Using SQL script. For instance, here you can change the length of NVARCHAR manually. In order to run this script, select the SQL pool (Built-in) and the database that you created. Then click run and an external table is created. Note that using the Built-in SQL pool does not always work straight away. The automatically created script will provide as Type for the Data source Hadoop, however this is not possible while using the built-in SQL pool. In order to run the script, the type part has to be removed as it will use a built-in reader. For details on this, please use this link.

Open data from linked storage location

One of the advantages of using Azure Synapse Analytics over for instance Azure Databricks is the ease of importing data from the linked storages. To create the external table, we uploaded a csv file to the storage. In this section we will open the same file as a Spark Dataframe. For this, we do not have to write our own code, but we can use of the options of Azure Synapse Analytics. Again, select the file, and click on Load to Dataframe. This will auto-generate code to read from the location. In our case, headers are included, so uncomment the header line. To run this script, select an appropriate Spark pool and select Run all. It will take a few minutes for the Spark session to start, then the dataframe will be displayed. To find the notebook in the tabs, select the Develop tab. There you can also rename the notebook. It is also possible to open data from different linked storage locations. For this article, we linked the storage from Azure Machine Learning as well. If you have data there, opening it in a dataframe works the exact same way.

Use Microsoft provided data

The data we will use for this article is provided within the Microsoft Data samples. To access these data samples, click on the Data tab and then on the plus icon. Select Browse Gallery. This will open a page will several sample datasets. For this article we will be using the Bing Covid-19 Data and the COVID Tracking Project. Click on the datasets and select Add dataset. This will automatically publish the changes and add an Azure Blob Storage in the linked tab, where the data resides. To open them, right click on the datasets and select New Notebook and Load to Dataframe. We will open them both in their separate notebooks. However we will then copy the script into the next cell, such that the end result looks like this:

%%pysparkblob_account_name = "pandemicdatalake"blob_container_name = "public"blob_relative_path = "curated/covid-19/bing_covid-19_data/latest/bing_covid-19_data.parquet"blob_sas_token = r""# Allow SPARK to read from Blob remotelywasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),blob_sas_token)dfBing = spark.read.parquet(wasbs_path)display(dfBing.limit(10))
blob_account_name = "pandemicdatalake"blob_container_name = "public"blob_relative_path = "curated/covid-19/covid_tracking/latest/covid_tracking.parquet"blob_sas_token = r""# Allow SPARK to read from Blob remotelywasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),blob_sas_token)dfTracking = spark.read.parquet(wasbs_path)display(dfTracking.limit(10))

I also renamed the notebook to Covid Data. Now that we have successfully loaded the data in the notebooks, we will transform the data in the next section and write the results to a Spark table.

Transform data

The goal of the machine model that we will create, is to predict the number of Covid in a specific state in the US. The data set Covid Tracking is the basis for this model. It contains information on the number of covid cases, hospitalized citizens, recoveries etc. for each day for each state. In itself this dataset can be used to create a predictive model to predict the number of covid cases on the next day or next week for instance. However, it seems likely that the number of covid cases in a country is also somewhat linked to the number of cases in other countries. Therefore, this dataset, Bing Covid-19 Data is also added. The goal for the transform step is to create a dataframe that is cleaned from outliers, unnecessary or incomplete variables and contains the number of cases in relevant countries.

Our first step to that is required to reach this goal is to perform exploratory data analysis (EDA) on the Bing Covid-19 Data dataset. First, we will check for which countries or regions there is data available by using the distinct command. From the list of all countries or regions, we will select: “United States, Worldwide, Mexico, Canada and United Kingdom”. For those countries or regions, only the number of cases and deaths per day will be used. To join this dataframe to the Covid Tracking dataset, columns are required for each country. In the following code fragment, these columns are created and the dataframe is grouped per date.

from pyspark.sql.functions import col, when, mean as _mean, litcountryList = ["United States", "Worldwide", "Mexico", "Canada" ,"United Kingdom"]dfBing = dfBing.filter(col('country_region').isin(countryList))dfBing = dfBing.distinct()dfBing = dfBing.withColumn('Confirmed_Change_US',when(dfBing.country_region=='United States',dfBing.confirmed_change).otherwise(0))dfBing = dfBing.withColumn('Deaths_Change_US',when(dfBing.country_region=='United States',dfBing.deaths_change).otherwise(0))dfBing = dfBing.withColumn('Confirmed_Change_World',when(dfBing.country_region=='Worldwide',dfBing.confirmed_change).otherwise(0))dfBing = dfBing.withColumn('Deaths_Change_World',when(dfBing.country_region=='Worldwide',dfBing.deaths_change).otherwise(0))dfBing = dfBing.withColumn('Confirmed_Change_Mexico',when(dfBing.country_region=='Mexico',dfBing.confirmed_change).otherwise(0))dfBing = dfBing.withColumn('Deaths_Change_Mexico',when(dfBing.country_region=='Mexico',dfBing.deaths_change).otherwise(0))dfBing = dfBing.withColumn('Confirmed_Change_Canada',when(dfBing.country_region=='Canada',dfBing.confirmed_change).otherwise(0))dfBing = dfBing.withColumn('Deaths_Change_Canada',when(dfBing.country_region=='Canada',dfBing.deaths_change).otherwise(0))dfBing = dfBing.withColumn('Confirmed_Change_UK',when(dfBing.country_region=='United Kingdom',dfBing.confirmed_change).otherwise(0))dfBing = dfBing.withColumn('Deaths_Change_UK',when(dfBing.country_region=='United Kingdom',dfBing.deaths_change).otherwise(0))dfBingGrouped = dfBing.groupBy("updated").sum("Confirmed_Change_US","Deaths_Change_US","Confirmed_Change_World","Deaths_Change_World","Confirmed_Change_Mexico","Deaths_Change_Mexico","Confirmed_Change_Canada","Deaths_Change_Canada","Confirmed_Change_UK","Deaths_Change_UK")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Confirmed_Change_US)","Confirmed_Change_US")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Deaths_Change_US)","Deaths_Change_US")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Confirmed_Change_World)","Confirmed_Change_World")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Deaths_Change_World)","Deaths_Change_World")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Confirmed_Change_Mexico)","Confirmed_Change_Mexico")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Deaths_Change_Mexico)","Deaths_Change_Mexico")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Confirmed_Change_Canada)","Confirmed_Change_Canada")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Deaths_Change_Canada)","Deaths_Change_Canada")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Confirmed_Change_UK)","Confirmed_Change_UK")dfBingGrouped = dfBingGrouped.withColumnRenamed("sum(Deaths_Change_UK)","Deaths_Change_UK")

For the Covid Tracking dataset, we first look at the quality of the data. Upon first inspection, you can see that for the state ‘AS’, numbers are always 0. Therefore, this state is removed from the dataset. Next, the appropriate columns are selected and duplicate rows removed. For many states, the number of patients in the ICU is not available. However, it seems likely that this number can be estimated by the total number of people in hospital. Therefore, the mean ratio of ICU to hospital patients is calculated and missing values are filled in in this way. The total dataframe is combined and the Bing dataset is joined on the date column.

dfTracking = dfTracking.filter(col('state')!='AS')dfTracking = dfTracking.select('date','state','hospitalized_currently','in_icu_currently','death_increase','hospitalized_increase','negative_increase', 'positive_increase')dfTracking = dfTracking.distinct()#Calculate ICU percentagedfTrackingCalc = dfTracking.filter(col('in_icu_currently').cast('int').isNotNull())dfTrackingCalc = dfTrackingCalc.withColumn("ICUHospitalRatio",dfTrackingCalc.in_icu_currently/dfTrackingCalc.hospitalized_currently)df_stats = dfTrackingCalc.select(_mean(col('ICUHospitalRatio')).alias('mean')).collect()meanICUHospitalRatio = df_stats[0]['mean']dfTrackingMissingValues = dfTracking.filter(col('in_icu_currently').cast('int').isNull())dfTrackingMissingValues = dfTrackingMissingValues.withColumn('in_icu_currently',when(dfTrackingMissingValues.hospitalized_currently>-1,dfTrackingMissingValues.hospitalized_currently*meanICUHospitalRatio).otherwise(dfTrackingMissingValues.in_icu_currently))dfTrackingMissingValues = dfTrackingMissingValues.withColumn('in_icu_currently',col('in_icu_currently').cast('int'))dfTrackingTotal = dfTrackingCalc.drop('ICUHospitalRatio').unionByName(dfTrackingMissingValues)dfTotal = dfTrackingTotal.join(dfBingGrouped,dfTrackingTotal.date==dfBingGrouped.updated, 'left')dfTotal = dfTotal.withColumn('positive_increase',col('positive_increase').cast('int'))

Lastly, a training and test set is created by splitting the data in two groups based on the date. The resulting dataframes are saved as a Spark table. This allows us to use Azure Machine Learning in the next article to develop a Machine Learning model to predict the number of Covid cases.

dfTrain = dfTotal.filter("date < date'2021-01-01'")dfTest = dfTotal.filter("date >= date'2021-01-01'")
print("Train ",dfTrain.count())print("Test ",dfTest.count())dfTrain.write.mode("overwrite").saveAsTable("default.trainset")dfTest.write.mode("overwrite").saveAsTable("default.testset")

Sources

https://docs.microsoft.com/en-us/azure/data-factory/connector-azure-sql-data-warehouse?tabs=data-factory

--

--

Jonathan Bogerd

Data Scientist. I write about Data Science, Machine Learning and anything related to AI.