Bigdata Analytics untuk Deteksi Fraud — Part 2

aris budi santoso
7 min readApr 27, 2024

--

Tutorial Apache Spark untuk Fraud Detection pada Databrick Community Edition

Bagian pertama dari artikel ini telah membahas mengenai persiapan yang harus dilakukan untuk praktikum machine learning menggunakan Databricks. Apabila anda belum membaca bagian pertama, silahkan kunjungi link berikut.

Artikel bagian kedua ini akan membahas mengenai bagaimana menggunakan PySpark untuk mengolah data dan melatih model machine learning dalam konteks fraud detection. Code pada artikel ini mengambil dan memodifikasi code yang telah dibuat pada artikel berikut.

  1. Unduh data dari Kaggle ke Driver Node

Data yang kita gunakan adalah dataset yang dipublish pada platform Kaggle. Langkah pertama yang kita lakukan adalah mengunduh data tersebut ke node tempat kita akan menulis program/ code.

Sebagian perintah yang digunakan dalam proses ini adalah Linux shell script yang dijalankan pada notebook dengan perintah yang diawali % . Shell script tersebut dijalankan pada driver node.

a. Membuat direktori dan file Kaggle API key

Library Kaggle akan membaca environment variable KAGGLE_KONFIG_DIR untuk menemukan file yang berisi API Key untuk mengakses Kaggle API.

%sh mkdir kaggle_store
import json
dictionary = {"username":"your user name","key":"your key"}
json_object = json.dumps(dictionary, indent=4)
%sh cat /dev/null > kaggle_store/kaggle.json

with open("kaggle_store/kaggle.json", "w") as outfile:
outfile.write(json_object)
import os
os.environ['KAGGLE_CONFIG_DIR'] = "kaggle_store"

b. Install library Kaggle

%sh pip install kaggle

c. Mengunduh data

Pencarian data dapat dilakukan menggunakan library kaggle dengan perintah list dan sebuah keyword pencarian.

%sh kaggle datasets list -s "Synthetic Financial Datasets For Fraud Detection"

Output dari perintah tersebut adalah daftar dataset sebagaimana berikut.

Kita akan menggunakan dataset yang berjudul Synthetic Financial Datasets for Fraud Detection. Dataset tersebut memiliki pengenal pada kolom ref yaitu ealaxi/paysim1, pengenal tersebut digunakan untuk mengunduh data. Perintah untuk mengunduh dataset dengan library Kaggle adalah sebgai berikut.

%sh kaggle datasets download -d "ealaxi/paysim1"

Setelah proses unduh selesai, kita dapat memeriksa file yang telah diunduh dengan perintah berikut.

dbutils.fs.ls('file:/databricks/driver')

d. Copy data ke DBFS

Spark cluster akan membaca file yang tersimpan pada storage yang terdistribusi. Kita akan memindahkan file yang telah didownload ke DBFS. Unzip file yang telah diunduh.

%sh unzip paysim1.zip
dbutils.fs.mkdirs("dbfs:/FileStore/paysiml/")
dbutils.fs.cp("file:/databricks/driver/PS_20174392719_1491204439457_log.csv","dbfs:/FileStore/paysiml/")

2. Exploratory Data Analysis

Memahami data yang akan digunakan dalam analisis merupakan hal yang penting. Tahapan ini merupakan proses untuk memahami data dengan menerapkan analisis deskriptif. Langkah-langkah yang dilakukan adalah sebagai berikut.

Membaca file dan load ke dalam spark data frame

df = spark.read.csv('dbfs:/FileStore/paysiml/PS_20174392719_1491204439457_log.csv', header=True)

Menentukan format data dari setiap kolom

from pyspark.sql.functions import (col, lit, substring, to_date, when)
from pyspark.sql.types import IntegerType

df = df.withColumn("amount",col("amount").cast("integer"))
df = df.withColumn("oldbalanceOrg",col("oldbalanceOrg").cast("integer"))
df = df.withColumn("newbalanceOrig",col("newbalanceOrig").cast("integer"))
df = df.withColumn("oldbalanceDest",col("oldbalanceDest").cast("integer"))
df = df.withColumn("newbalanceDest",col("newbalanceDest").cast("integer"))
df = df.withColumn("isFraud",col("isFraud").cast("integer"))

Membuat view data dari dataframe

df.createOrReplaceTempView("paysim")

Melihat perbandingan jumlah data untuk kelas fraud dan bukan fraud

display(df.groupBy("isFraud").count())

Output dari perintah di atas berupa table dan dapat ditambahkan juga visualisasi sebagaimana pada gambar di bawah ini.

display(df.groupBy("type").count())

Eksplorasi menggunakan SQL

display(spark.sql('select type, count(1) from paysim where amount > 10000 group by type'))

3. Data Preprocessing

Tahapan ini merupakan proses penyiapan data untuk digunakan dalam analisis. Data proprocessing mencakup penanganan permasalahan kualitas data hingga pembentukan dan pembagian dataset untuk keperluan training dan testing.

Memeriksa missing value

from pyspark.sql.functions import isnan, count
display(df.select([(100 * count(when(isnan(c) | col(c).isNull(), c))/count(c)).alias(c) for c in df.columns]))

Mengisi missing value dengan nilai 0

df = df.na.fill(value=0,subset=["amount","oldbalanceOrg","newbalanceOrig","oldbalanceDest","newbalanceDest"])

Split dataset untuk keperluan training dan testing

(train, test) = df.randomSplit([0.8, 0.2], seed=12345)

4. Melatih Model Machine Learning

Dataset paysim merupakan dataset berlabel yaitu fraud dan non fraud. Label tersebut terdapat pada kolom isFraud yang memiliki nilai binary (0 atau 1). Kita dapat melatih model supervised machine learning dengan data tersebut dengan jenis pekerjaan Classification karena label data berupa categorical. Langkah-langkah yang dilakukan untuk melatih model machine learning adalah sebagai berikut.

Import modul machine learing pada library PySpark

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

Membuat pipeline machine learning yang mencakup indexer, vector assembler dan algoritma decission tree classifier.

# Encodes a string column of labels to a column of label indices
indexer = StringIndexer(inputCol = "type", outputCol = "typeIndexed")

# VectorAssembler is a transformer that combines a given list of columns into a single vector column
va = VectorAssembler(inputCols = ["typeIndexed", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"], outputCol = "features")

# Using the DecisionTree classifier model
dt = DecisionTreeClassifier(labelCol = "isFraud", featuresCol = "features", seed = 54321, maxDepth = 5)

# Create our pipeline stages
pipeline = Pipeline(stages=[indexer, va, dt])

Menggunakan pipeline yang sudah dibuat untuk melatih model machine learning untuk klasifikasi.

# View the Decision Tree model (prior to CrossValidator)
dt_model = pipeline.fit(train)

5. Kinerja Model

a. Hyperparameter Tunning

Kinerja model classifier dapat dipengaruhi oleh nilai parameter dari algoritma yang digunakan, dan setiap algoritma memiliki parameter berbeda-beda dan dapat ditunning untuk mendapatkan kinerja model terbaik.

Model yang kita latih menggunakan algoritma decission tree, paramter yang akan kita tunning dari algoritma tersebut adalah maximum depth dan maximum bins. Proses hyperparameter tunning memakan resources dan waktu yang banyak, karena proses trainning dan testing dilakukan sebanyak jumlah kombinasi dari variablenya, dikalikan lagi dengan jumlah folds yang digunakan.

# Build the grid of different parameters
paramGrid = ParamGridBuilder() \
.addGrid(dt.maxDepth, [5, 10, 15]) \
.addGrid(dt.maxBins, [10, 20, 30]) \
.build()

# Build out the cross validation
crossval = CrossValidator(estimator = dt,
estimatorParamMaps = paramGrid,
evaluator = BinaryClassificationEvaluator().setLabelCol("isFraud"),
numFolds = 3)
# Build the CV pipeline
pipelineCV = Pipeline(stages=[indexer, va, crossval])
cvModel_u = pipelineCV.fit(train)

Evaluasi model menggunakan kurva ROC dan Precission Recall. Hasil dari melatih model akan terlihat dari luas area di bawah kurva/ Area Under The Curve (AUC). Pengukuran kinerja model dilakukan baik terhadap hasil prediksi menggunakan data training maupun dengan data testing.

evaluator = BinaryClassificationEvaluator().setLabelCol("isFraud")
# Build the best model (training and test datasets)
train_pred = cvModel_u.transform(train)
test_pred = cvModel_u.transform(test)

# Evaluate the model on training datasets
pr_train = evaluator.evaluate(train_pred, {evaluator.metricName: "areaUnderPR"})
auc_train = evaluator.evaluate(train_pred, {evaluator.metricName: "areaUnderROC"})

# Evaluate the model on test datasets
pr_test = evaluator.evaluate(test_pred, {evaluator.metricName: "areaUnderPR"})
auc_test = evaluator.evaluate(test_pred, {evaluator.metricName: "areaUnderROC"})

# Print out the PR and AUC values
print("PR train:", pr_train)
print("AUC train:", auc_train)
print("PR test:", pr_test)
print("AUC test:", auc_test)

b. Penanganan permasalahan class imbalance

Dataset paysim yang digunakan apabila dilihat dari proporsi antara jumlah data dengan kelas fraud dengan non-fraud, maka memiliki permasalahan class imbalance. Proporsi data dengan class fraud terlihat sangat kecil dibandingkan dengan proporsi kelas non-fraud.

Kondisi class imbalance tersebut dapat berdampak pada rendahnya precission dan recall model klasifikasi untuk target class fraud, karena sedikitnya data latih dengan label fraud yang dapat digunakan untuk melatih model klasifikasi.

Penanganan atas permasalahan class imbalance dapat dilakukan dengan menerapkan over sampling untuk kelas minoritas dan under sampling untuk kelas mayoritas.

# Reset the DataFrames for no fraud (`dfn`) and fraud (`dfy`)
dfn = train.filter(train.isFraud == 0)
dfy = train.filter(train.isFraud == 1)

# Calculate summary metrics
N = train.count()
y = dfy.count()
p = y/N

# Create a more balanced training dataset
train_b = dfn.sample(False, p, seed = 92285).union(dfy)

# Print out metrics
print("Total count: %s, Fraud cases count: %s, Proportion of fraud cases: %s" % (N, y, p))
print("Balanced training dataset count: %s" % train_b.count())

# Display our more balanced training dataset
display(train_b.groupBy("isFraud").count())
Perbandingan Jumlah Data Kelas Fraud dan Non-fraud

Selanjutnya kita akan melakkan kembali evaluasi kinerja model dan hyperparameter tunning dengan metode grid search cross validation sebagaimana yang telah dilakukan sebelumnya pada bagian a.

# Re-run the same ML pipeline (including parameters grid)
crossval_b = CrossValidator(estimator = dt,
estimatorParamMaps = paramGrid,
evaluator = evaluator,
numFolds = 3)
pipelineCV_b = Pipeline(stages=[indexer, va, crossval_b])

# Train the model using the pipeline, parameter grid, and BinaryClassificationEvaluator using the `train_b` dataset
cvModel_b = pipelineCV_b.fit(train_b)

# Build the best model (balanced training and full test datasets)
train_pred_b = cvModel_b.transform(train_b)
test_pred_b = cvModel_b.transform(test)

# Evaluate the model on training datasets
pr_train_b = evaluator.evaluate(train_pred_b, {evaluator.metricName: "areaUnderPR"})
auc_train_b = evaluator.evaluate(train_pred_b, {evaluator.metricName: "areaUnderROC"})

# Evaluate the model on test datasets
pr_test_b = evaluator.evaluate(test_pred_b, {evaluator.metricName: "areaUnderPR"})
auc_test_b = evaluator.evaluate(test_pred_b, {evaluator.metricName: "areaUnderROC"})

# Print out the PR and AUC values
print("PR train:", pr_train_b)
print("AUC train:", auc_train_b)
print("PR test:", pr_test_b)
print("AUC test:", auc_test_b)

c. Confusion Matrix

Confusion matrix adalah instrumen yang digunakan dalam evaluasi kinerja model klasifikasi. Ukuran kinerja model klasifikasi seperti accuracy, precission, recall, dan F1 Score dapat dihitung berdasarkan matrix ini.

from sklearn.metrics import confusion_matrix,f1_score,log_loss,roc_curve,recall_score,precision_recall_curve,precision_score,fbeta_score,auc, roc_auc_score, accuracy_score, classification_report, ConfusionMatrixDisplay, RocCurveDisplay
import numpy as np
nptest = np.array(test_pred_b.select('isFraud').collect())
nppred = np.array(test_pred_b.select('prediction').collect())
cm_test = confusion_matrix(nptest, nppred, labels=[0,1])
disp_test = ConfusionMatrixDisplay(confusion_matrix=cm_test, display_labels=[0,1])
disp_test.plot(cmap='Blues', values_format='').ax_.grid()

Terima kasih anda telah membaca dan menyimak, code lengkap dari tutorial ini dapat anda temukan di GitHub pada link berikut.

--

--