Big Analytics For Suicide Rate: Overview Rate

Agbeyeke Debs
4 min readApr 19, 2024

--

This project and the code demonstrates the use of Apache Spark for data processing and analysis. It utilizes Spark’s DataFrame API to load and manipulate tabular data.

Here’s a breakdown of what the code does and the different parts of the code:

  1. Setup and Environment: The code begins by setting up the Spark environment. It installs the necessary dependencies, including Apache Spark itself and the required Java version.
  2. Data Loading: It loads a dataset containing information about suicide rates from a CSV file named master.csv.
  3. Data Exploration and Analysis: After loading the data, the code performs various operations to explore and analyze it. This includes displaying the schema of the DataFrame, printing summary statistics, and showing the first few rows of the dataset.
  4. Data Cleaning and Transformation: The code performs some basic data cleaning and transformation operations, such as removing unnecessary columns, renaming columns, and handling missing values.
  5. Statistical Analysis: It calculates summary statistics for numerical columns and performs basic statistical analysis on the dataset.
  6. Documentation and Comments: Throughout the code, there are comments and explanations to provide clarity on the operations being performed and the purpose of each section.

To run this code:

  • Ensure you have Apache Spark installed and configured properly.
  • Make sure you have the required dataset (master.csv) available in the specified location or update the code to point to the correct file path.
  • Execute the code in a Spark environment, such as a Jupyter notebook or a Spark cluster.

The Data Set used for this project was gotten from Kaggle -> https://www.kaggle.com/datasets/russellyates88/suicide-rates-overview-1985-to-2016

LINK TO CODE ON GITHUB AND CODE BLOCK BELOW:

Big_Analytics_For_Suicide_Rate_Overview_Rate (1).ipynb
!apt-get update

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!ls /usr/lib/jvm



!wget -q https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz


!ls

!du -sh spark-3.0.3-bin-hadoop3.2.tgz


!tar -xvzf spark-3.0.3-bin-hadoop3.2.tgz

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

!pip install pyspark==3.0.3
!pip install -q findspark
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import *
from pyspark.sql.functions import col, sum
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansSummary
from pyspark.ml.clustering import BisectingKMeans, BisectingKMeansSummary
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

spark = SparkSession.builder.appName('SuicideRatesOverview').getOrCreate()
spark

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

**Reading the Dataset**

data = spark.read.csv('/content/master.csv', inferSchema=True, header=True)
data



data.show()

data.toPandas()

# data validation
data.columns

data.head()

data.printSchema()

print(data.printSchema())
print("")
print(data.columns)
print("")
print(data.describe())

from pyspark.sql.types import StructField,StringType,IntegerType,StructType,DoubleType


from struct import Struct
from pickle import TRUE
data_schema = [StructField("country", StringType(), True),\
StructField("year", IntegerType(), True),\
StructField("sex", StringType(), True),\
StructField("age", StringType(), True),\
StructField("suicide_no", IntegerType(), True),\
StructField("population", IntegerType(), True),\
StructField("suicides/100k pop", DoubleType(), True),\
StructField("country-year", StringType(), True),\
StructField("HDI for year", DoubleType(), True),\
StructField("gdp_for_year ($)", StringType(), True),\
StructField("gdp_per_capita ($)", IntegerType(), True),\
StructField("generation", StringType(), True)]

final_struc = StructType(fields=data_schema)
final_struc

data = spark.read.csv('/content/master.csv', schema=final_struc)
data

data.printSchema()

print('Number of rows:', data.count())
print('Number of columns:', len(data.columns))

data.columns

data.describe().show()

# Finding the missing values
# Check for null values in all columns
from pyspark.sql.functions import col, isnan, when, count

null_counts = data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns])

# Show the counts of null values in each column
null_counts.show()

data_fill = data.fillna(0)
data_fill.show()

data_fill.describe().show()

data_fill.groupBy("country").count().show()

data_fill.groupBy("country").mean("suicide_no").show()

data_fill.select("suicide_no", "suicides/100k pop", "HDI for year", "gdp_per_capita ($)").summary("count","min","25%","50%","75%","max").show()

# converting categorical variables into numerical variables using the String Indexer
# create a list of the categorical columns
cat_cols = ["country", "sex", "age","gdp_for_year ($)", "country-year", "generation"]

# instantiate string index for the categorical variables
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(data_fill)for col in cat_cols]

# apply transformation to dataframe
indexed_data = data_fill
for indexer in indexers:
indexed_data = indexer.transform(indexed_data)

indexed_data.show()


data_ML = indexed_data.drop("country","sex","age","country-year","gdp_for_year ($)","generation")
data_ML.show()

**APPLYING PYSPARK MACHINE LEARNING CLUSTERING TECHNIQUE ON SUICIDE RATE OVERVIEW**

# creating a vector assembler for the dataset
input_columns = data_ML.columns

# create the vector
vecAssembler = VectorAssembler(inputCols=input_columns, outputCol="features")
data_ML_KMeans = vecAssembler.transform(data_ML)
data_ML_KMeans.show()


# set a max for the number of clusters needed
kmax = 50
# creating an array filled with zeros for the amount of K
kmcost = np.zeros(kmax)
for k in range(2,kmax):

kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
# fit to dataset
model = kmeans.fit(data_ML_KMeans)

# compute the "cost" (sum of squared distances) between the input points and their corresponding cluster centers
kmcost[k] = model.summary.trainingCost

print(kmcost[2:kmax])



#Plot the cost vs number of Clusters
fig, ax = plt.subplots(1,1, figsize =(10,8))
plt.plot(range(2,kmax), kmcost[2:kmax])
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Cost")
plt.title("Elbow Method for Optimal k")
plt.show()

## Fit the final model
k = 8
kmeans = KMeans().setK(k).setSeed(3).setFeaturesCol("features")
model = kmeans.fit(data_ML_KMeans)

predictions = model.transform(data_ML_KMeans)

evaluator = ClusteringEvaluator()

silhouette_score = evaluator.evaluate(predictions)
print("Silhouette Score = " + str(silhouette_score))

centers = model.clusterCenters()
for centers in centers:
print(centers)

predictions.toPandas()


predictions.groupBy("prediction").agg(min(predictions.suicide_no), max(predictions.suicide_no)).show()

## BisectingKMeans
kmax = 50
bkmcost = np.zeros(kmax)
for k in range(2, kmax):
bkmeans = BisectingKMeans().setK(k).setSeed(1).setFeaturesCol("features")
model_bk = bkmeans.fit(data_ML_KMeans)
bkmcost[k] = model_bk.summary.trainingCost

print(bkmcost[2:kmax])



fig, ax = plt.subplots(1,1, figsize =(10,8))
ax.plot(range(2,kmax),bkmcost[2:kmax])
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Cost")
plt.title("Elbow Method for Optimal k")
plt.show()

# Fit the final model
k = 8
bkmeans = BisectingKMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = bkmeans.fit(data_ML_KMeans)

predictions = model.transform(data_ML_KMeans)

evaluator = ClusteringEvaluator()

silhouette_bkmeans_score = evaluator.evaluate(predictions)
print("Silhouette_bkmeans_score = " + str(silhouette_bkmeans_score))

predictions.groupBy("prediction").agg(min(predictions.suicide_no), max(predictions.suicide_no)).show()

--

--