How to use SingleStore with Spark ML for Fraud Detection — 3/3

Akmal Chaudhri
4 min readSep 2, 2021

--

Create and evaluate a Logistic Regression model

Abstract

In this final part of our Fraud Detection series, we’ll use Spark to build a Logistic Regression model from data stored in SingleStore.

The notebook files used in this article series are available on GitHub in DBC, HTML and iPython formats.

Introduction

This is a multi-part article series, and it is structured as follows:

  1. Configure Databricks CE.
  2. Load the Credit Card data into SingleStore.
  3. Create and evaluate a Logistic Regression model.

This third article covers Part 3, Create and evaluate a Logistic Regression model. Please ensure that you have completed the setup and requirements described in the two previous articles if you follow this series.

According to Andrea Dal Pozzolo, who was involved in collecting the original dataset we are using for this example use case, fraud detection is a classification problem. Also, since investigators may only review a limited number of transactions, the probability of fraud is more important than the true classification. Therefore, a suitable algorithm for the initial analysis is Logistic Regression. This is because the outcome has only two possible values.

Fill out the Notebook

Let’s now create a new notebook. We’ll call it Fraud Detection using Logistic Regression. We’ll attach our new notebook to our Spark cluster.

In the first code cell, let’s add the following:

%run ./Setup

We can then execute the notebook that we previously created. We need to ensure that the server address and password have been added for our SingleStore Managed Service cluster.

In the next code cell, we’ll set some parameters for the SingleStore Spark Connector, as follows:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")

These are parameters for the SingleStore cluster, username, password and whether Pushdown is enabled or disabled. We’ll discuss Pushdown in a separate article.

In the next code cell, let’s read the data from the SingleStore table into a Spark Dataframe, as follows:

%%time

df = (spark.read
.format("singlestore")
.load("fraud_detection.credit_card_tx"))

Using the %%time in the code cell allows us to measure the read operation. It should take just milliseconds to complete.

In the next code cell, we’ll get the number of rows:

df.count()

This value should match the result we obtained in the previous article.

In the next code cell, we’ll drop any null values and then count the number of rows again, as follows:

df = df.dropna()
df.count()

The result should show that there are no null values.

As previously mentioned, the dataset is highly skewed. There are several solutions we can use to manage a skewed dataset. The initial approach we can take is to under-sample. We’ll keep all the 492 fraudulent transactions and reduce the number of non-fraudulent transactions. There are several ways we could perform this dataset reduction:

  • Randomly select majority class examples.
  • Select every nth row from the majority class examples.

Let’s use the first approach and select 1% of the majority class examples for our initial analysis.

First, we’ll separate the two possible outcomes into two Dataframes in a code cell, as follows:

is_fraud = df.select("*").filter("Class == 1")
no_fraud = df.select("*").filter("Class == 0")

In the next code cell, we’ll randomly sample 1% of non-fraudulent transactions, without replacement, as follows:

no_fraud = no_fraud.sample(False, 0.01, seed = 123)

In the next code cell, we’ll concatenate the two Dataframes, sort on the Time column and print out the number of rows:

df_concat = no_fraud.union(is_fraud)
df = df_concat.sort("Time")
df.count()

In the next code cell, we’ll check the structure of the Dataframe:

display(df)

Next, in a code cell, we’ll create our train-test split:

train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())

We’ll use 70% and 30% for train and test, respectively.

The code in the following sections was inspired by a Logistic Regression example, available on GitHub.

In the next code cell, we’ll generate an is_fraud label column for the training data, using a UDF, as follows:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("is_fraud", is_fraud(train.Class))

We are now ready to create and fit a Spark Machine Learning model:

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Create the feature vectors.

assembler = VectorAssembler(
inputCols = [x for x in train.columns if x not in ["Time", "Class", "is_fraud"]],
outputCol = "features")

# Use Logistic Regression.

lr = LogisticRegression().setParams(
maxIter = 100000,
labelCol = "is_fraud",
predictionCol = "prediction")

model = Pipeline(stages = [assembler, lr]).fit(train)

For the VectorAsembler, we want to use the columns V1 to V28 and the Amount of the transaction. Therefore, we ignore the Time, Class and is_fraud columns. Using Logistic Regression, we create our model.

Next, we’ll predict whether a transaction is fraudulent or not, using the test data, as follows:

predicted = model.transform(test)

And show the predictions as follows:

display(predicted)

Finally, we’ll check the performance of our model using a confusion matrix:

predicted = predicted.withColumn("is_fraud", is_fraud(predicted.Class))
predicted.crosstab("is_fraud", "prediction").show()

Overall, the results should show that our initial model makes good predictions. Because Data Science and Machine Learning are iterative processes, we can look for ways to improve and tune our classifier. For example, normalising the data could be beneficial to explore in the next iteration.

Summary

In this article series, we have seen how easily SingleStore can be used with Spark. The key benefits of the SingleStore Spark Connector can be summarised as follows:

  • Implemented as a native Spark SQL plugin.
  • Accelerates ingest from Spark via compression.
  • Supports data loading and extraction from database tables and Spark Dataframes.
  • Integrates with the Catalyst query optimiser and supports robust SQL Pushdown.
  • Accelerates ML workloads.

In a future article, we’ll explore External Functions and discuss how they could be helpful for Machine Learning.

In the next article series, we’ll look at an example of Pushdown. Stay tuned!

--

--

Akmal Chaudhri

I help build global developer communities and raise awareness of technology through presentations and technical writing.