An example of Pushdown using SingleStore and Spark — 1/2

Akmal Chaudhri
3 min readSep 6, 2021

Load the Weather data into SingleStore

Abstract

In this article series, we’ll look at an example of query Pushdown when using the SingleStore Spark Connector. This first article will load some weather data into SingleStore.

The notebook files used in this article series are available on GitHub.

Introduction

The Singlestore Spark Connector supports the rewriting of Spark query execution plans, for both SQL and Dataframe operations, into SingleStore queries. Computation is pushed into the SingleStore system automatically. By default, query rewrites are enabled but can be disabled using the disablePushdown option, as follows:

# Pushdown enabled
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")

# Pushdown disabled
spark.conf.set("spark.datasource.singlestore.disablePushdown", "true")

SingleStore also supports partial Pushdown. This enables some parts of a query to be evaluated by SingleStore and other parts to be evaluated by Spark. We’ll look at an example of this shortly. We’ll develop everything in the cloud using the SingleStore Managed Service for ease of use.

To begin with, we need to create a free Managed Service account on the SingleStore website.

This is a two-part article series and it is structured as follows:

  1. Load the Weather data into SingleStore.
  2. Demonstrate a Pushdown example.

This first article covers Part 1, Load the Weather data into SingleStore.

In our SingleStore Managed Service account, let’s use the SQL Editor to create a new database. Call this weather, as follows:

CREATE DATABASE IF NOT EXISTS weather;

We’ll run the command and check that it completed.

We need some data for our use case. We can find actual weather data from the CORGIS Dataset Project on GitHub. The weather.csv file contains weather data for cities across the United States for 2016.

Fill out the Notebook

Let’s now create a new notebook. We’ll call it data_loader_for_pushdown.

In a new code cell, let’s add the following code:

column_mapping = {
"Data.Precipitation" : "Precipitation",
"Date.Full" : "Date",
"Date.Month" : "Month",
"Date.Week of" : "Week",
"Date.Year" : "Year",
"Station.City" : "City",
"Station.Code" : "Code",
"Station.Location" : "Location",
"Station.State" : "State",
"Data.Temperature.Avg Temp" : "Avg",
"Data.Temperature.Max Temp" : "Max",
"Data.Temperature.Min Temp" : "Min",
"Data.Wind.Direction" : "Wind_Direction",
"Data.Wind.Speed" : "Wind_Speed",
}

schema = {
"Precipitation" : "float32",
"Date" : "string",
"Month" : "int32",
"Week" : "int32",
"Year" : "int32",
"City" : "string",
"Code" : "string",
"Location" : "string",
"State" : "string",
"Avg" : "int32",
"Max" : "int32",
"Min" : "int32",
"Wind_Direction" : "int32",
"Wind_Speed" : "float32"
}

This schema shortens some of the original column names in the CSV file.

We’ll create a new Dataframe in the next code cell, as follows:

url = "https://raw.githubusercontent.com/corgis-edu/corgis/master/website/datasets/csv/weather/weather.csv"

df = pd.read_csv(url)

This reads the CSV file and creates a Dataframe called df. We’ll also rename the columns and apply the schema:

df = df.rename(columns = column_mapping).astype(schema)

In the next code cell, let’s get the number of rows:

df.count()

Executing this, we obtain the value 16743 for each column.

In the next code cell, we’ll take a look at the structure of the Dataframe:

df.head()

Next, let’s plot the maximum and minimum temperatures for San Francisco.

df_sf = df[df["City"] == "San Francisco"].sort_values(by = "Date")

fig = px.line(
df_sf,
x = "Date",
y = ["Max", "Min"],
title = "Max and Min Temperatures in San Francisco (Fahrenheit)"
)

fig.show()

This gives us the following plot, shown in Figure 1.

Figure 1. Max and Min in Fahrenheit for San Francisco.
Figure 1. Max and Min in Fahrenheit for San Francisco.

We are now ready to write the Dataframe to SingleStore. In the next code cell, we can add the following:

from sqlalchemy import *

db_connection = create_engine(connection_url)

Finally, we are ready to write the Dataframe to SingleStore:

df.to_sql(
"temperatures_all",
con = db_connection,
if_exists = "replace",
index = False,
chunksize = 1000
)

This will write the Dataframe to a table called temperatures_all in the weather database. We can check that this table was successfully created from SingleStore.

Summary

In this first article in this Pushdown series, we have successfully loaded our weather data into a Dataframe, checked the number of rows and then written the Dataframe to SingleStore. In the next article, we’ll see an example of Pushdown using our weather database.

--

--

Akmal Chaudhri
Akmal Chaudhri

Written by Akmal Chaudhri

I help build global developer communities and raise awareness of technology through presentations and technical writing.