Exploración y Preprocesamiento de datos usando PySpark — tarjeta de crédito
En este artículo hablaré un poco la exploración y preprocesamiento de datos usando Spark de un set de datos de kaggle de tarjetas de crédito (“credit card”).
¿Exploración de datos?
En primer lugar, debemos entender mucho mejor la exploración de datos como un proceso y, si es posible, crear modelos capaces de describirla en términos abstractos útiles.
En segundo lugar, necesitamos desarrollar más métodos, técnicas y herramientas para respaldar la exploración interactiva de datos. Apuesto a que hay muchas “necesidades latentes” que esperan ser descubiertas. Asi como resolver hipótesis que nos planteamos.
En tercer lugar, debemos encontrar formas de enseñar sistemáticamente análisis de datos exploratorios a otros de forma que el proceso sea lo más eficaz posible.
¿Preprocesamiento de los datos?
Existe un flujo que es común para la mayoría de los algoritmos de Machine Learning
- Importar los datos y resumen
- Establecer una estrategia para los datos que faltan (missing values) y las variables categóricas
- Dividir el conjunto en entrenamiento (Train) y validación (Test)
- Escalar (la mayoría de algoritmos de las librerías ya incluyen el escalado
- Construir y ajustar (fit) el modelo que se vaya a usar al conjunto de entrenamiento
- Utilizar el modelo con nueva información del conjunto de validación
- Validar el comportamiento (accuracy) que tiene el modelo -> RMSE + Gráfico
- Redefinir el modelo para conseguir mejores predicciones
- Guardar y desplegar el modelo en producción
Demostración con Spark en kaggle
ruta del desafió
Lo puede ver aquí
importar librerías
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import siximport seaborn as sns
sns.set(color_codes=True)
import numpy as np
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col,sum
Spark Session
spark = SparkSession.builder.appName(‘APP_CREDIT_CARD’).getOrCreate()
Estructura del esquema
El esquema ayuda cuando crear un modelo y por alguna razón alguien te lo cambia, ya sabes la estructura inicial
schema = StructType([
StructField(“ID”, IntegerType()),
StructField(“LIMIT_BAL”, DoubleType()),
StructField(“SEX”, DoubleType()),
StructField(“EDUCATION”, DoubleType()),
StructField(“MARRIAGE”, DoubleType()),
StructField(“AGE”, DoubleType()),
StructField(“PAY_0”, DoubleType()),
StructField(“PAY_2”, DoubleType()),
StructField(“PAY_3”, DoubleType()),
StructField(“PAY_4”, DoubleType()),
StructField(“PAY_5”, DoubleType()),
StructField(“PAY_6”, DoubleType()),
StructField(“BILL_AMT1”, DoubleType()),
StructField(“BILL_AMT2”, DoubleType()),
StructField(“BILL_AMT3”, DoubleType()),
StructField(“BILL_AMT4”, DoubleType()),
StructField(“BILL_AMT5”, DoubleType()),
StructField(“BILL_AMT6”, DoubleType()),
StructField(“PAY_AMT1”, DoubleType()),
StructField(“PAY_AMT2”, DoubleType()),
StructField(“PAY_AMT3”, DoubleType()),
StructField(“PAY_AMT4”, DoubleType()),
StructField(“PAY_AMT5”, DoubleType()),
StructField(“PAY_AMT6”, DoubleType()),
StructField(“label”, DoubleType())
])
Leer archivo
df = spark.read.csv(“Credit_Card.csv”, schema=schema, header=True)
Tipo de datos
df.printSchema()
Cantidad de filas
df.count()
Valores nulos(missing values)
df_null = df.select(*(F.sum(F.col(c).isNull().cast(“Double”)).alias(c) for c in df.columns)).toPandas()
df_null
Dataset sin ID y sin label
df_select1 = df.columns[1:]
df_select2 = df.columns[1:-1]
Detección de valores unico
for col in df_select1:
col_count = df.select(col).distinct().count()
print(“variable {0} , count {1}”.format(col, col_count))
Primeros 10 valores del dataset
df_dataset = df.limit(10)
df_dataset.toPandas()
Data descriptiva
df_describe = df.select([c for c in df_select1]).describe().toPandas().transpose()
función de Anomalías
def plot_model(lables, alg_name, plot_index):
ax = fig.add_subplot(3,2,plot_index)
color_code = {‘anomaly’:’red’, ‘normal’:’green’}
colors = [color_code[x] for x in labels]
ax.scatter(X.iloc[:,0], X.iloc[:,1], color=colors, marker=’.’, label=’red = anomaly’)
ax.legend(loc=”lower right”)
leg = plt.gca().get_legend()
leg.legendHandles[0].set_color(‘red’)
ax.set_title(alg_name)
Selección de variables excepto ID y LABEL
X = df.select([c for c in df_select2]).toPandas()
fig = plt.figure(figsize=(15,15))
Visualizacion de Anomalías con dbscan
model = DBSCAN(eps=0.63).fit(X)
labels = model.labels_
labels = [(‘anomaly’ if x==-1 else ‘normal’) for x in labels]
plot_model(labels, ‘DBSCAN’, 1)
Visualización de Anomalías con Isolation Forest
model = IsolationForest().fit(X)
scores_pred = model.decision_function(X)
threshold = stats.scoreatpercentile(scores_pred, 100 * outliers_fraction)
labels = [(‘anomaly’ if x<threshold else ‘normal’) for x in scores_pred]
plot_model(labels, ‘Isolation Forest’, 2)
Visualización de Anomalías con LocalOutlierFactor
model = LocalOutlierFactor()
model.fit_predict(X)
scores_pred = model.negative_outlier_factor_
threshold = stats.scoreatpercentile(scores_pred, 100 * outliers_fraction)
labels = [(‘anomaly’ if x<threshold else ‘normal’) for x in scores_pred]
plot_model(labels, ‘LocalOutlierFactor’, 3)
Visualización de la correlación
plt.figure(figsize=(24,23))
sns.heatmap(df.select([c for c in df_select1]).toPandas().corr(),linewidths=0.1,vmax=1.0, square=True, linecolor=’white’, annot=True)
plt.show()
plt.gcf().clear()
Visualización de la distribución
numeric_features = [t[0] for t in df.select([c for c in df_select1]).dtypes if t[1] == ‘int’ or t[1] == ‘double’]
sampled_data = df.select(numeric_features).sample(False, 0.1).toPandas()
axs = pd.plotting.scatter_matrix(sampled_data, figsize=(24, 23))
n = len(sampled_data.columns)
for i in range(n):
v = axs[i, 0]
v.yaxis.label.set_rotation(0)
v.yaxis.label.set_ha(‘right’)
v.set_yticks(())
h = axs[n-1, i]
h.xaxis.label.set_rotation(90)
h.set_xticks(())
Visualización de valores vacios conocido como “missing values”
missing_df = df.select([c for c in df_select1]).toPandas().isnull().sum(axis=0).reset_index()
missing_df.columns = [‘column_name’, ‘missing_count’]
missing_df = missing_df.loc[missing_df[‘missing_count’]>0]
missing_df = missing_df.sort_values(by=’missing_count’)ind = np.arange(missing_df.shape[0])
width = 0.9
fig, ax = plt.subplots(figsize=(12,18))
rects = ax.barh(ind, missing_df.missing_count.values, color=’blue’)
ax.set_yticks(ind)
ax.set_yticklabels(missing_df.column_name.values, rotation=’horizontal’)
ax.set_xlabel(“Count of missing values”)
ax.set_title(“Number of missing values in each column”)
plt.show()En este caso no se detecta campo vacíos
Correlación entre las variables independientes y la variable label.
for i in df_select2:
if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
print( “Correlation to label for “, i, df.stat.corr(‘label’,i))
Visualización del balance del label
responses = df.groupBy(‘label’).count().collect()
categories = [i[0] for i in responses]
counts = [i[1] for i in responses]ind = np.array(range(len(categories)))
width = 0.5
plt.bar(ind, counts, width=width, color=’r’)
plt.ylabel(‘counts’)
plt.title(‘label distribution’)
plt.xticks(ind + width/2., categories)
plt.show()
Imputacion de datos
from pyspark.ml.feature import Imputer
lamentablemente no existe datos para realizar imputación =(
Transformación de los datos
from pyspark.ml.feature import StringIndexer, OneHotEncoder
lamentablemente no existe datos para transformar =(
Partición de los datos
(trainingData, testData) = df.select([c for c in df_select1]).randomSplit([0.8, 0.2])
Vectorización de los datos
feature_columns = trainingData.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol=’features’)
Modelo selección RF
rf = RandomForestClassifier(labelCol=”label”, featuresCol=”features”, numTrees=50)
pipeline = Pipeline(stages=[assembler,rf])
model = pipeline.fit(trainingData)
prediction = model.transform(testData)
result = prediction.select(“prediction”, “probability”, “rawPrediction”, “features”)
Test Area Roc
b_evaluator = BinaryClassificationEvaluator(labelCol=’label’)
roc = b_evaluator.evaluate(prediction, {evaluator.metricName: “areaUnderROC”})
print(“Test Area Under ROC: {0}”.format(roc))
ParamGrid
paramGrid = ParamGridBuilder().build()
Validación cruzada(numFolds=5)
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=b_evaluator,
numFolds=5)
Selecciona el mejor Modelo de entrenamiento (K*M)
CV_model = crossval.fit(trainingData)
predictions_improved = CV_model.bestModel.transform(testData)
Evaluación del modelo
evaluator = MulticlassClassificationEvaluator(labelCol=”label”, predictionCol=”prediction”, metricName=”accuracy”)
accuracy = evaluator.evaluate(predictions_improved)
print(“Accuracy = %g” % accuracy)
evaluatorf1 = MulticlassClassificationEvaluator(labelCol=”label”, predictionCol=”prediction”, metricName=”f1")
f1 = evaluatorf1.evaluate(predictions_improved)
print(“f1 = %g” % f1)
evaluatorwp = MulticlassClassificationEvaluator(labelCol=”label”, predictionCol=”prediction”, metricName=”weightedPrecision”)
wp = evaluatorwp.evaluate(predictions_improved)
print(“weightedPrecision = %g” % wp)
evaluatorwr = MulticlassClassificationEvaluator(labelCol=”label”, predictionCol=”prediction”, metricName=”weightedRecall”)
wr = evaluatorwr.evaluate(predictions_improved)
print(“weightedRecall = %g” % wr)
Visualización de la probabilidad
all_probs = predictions_improved.select(“probability”).collect()
pos_probs = [i[0][0] for i in all_probs]
neg_probs = [i[0][1] for i in all_probs]
# positivos
plt.hist(pos_probs, 50, facecolor=’blue’, alpha=0.5)
plt.xlabel(‘valores predecidos’)
plt.ylabel(‘Counts’)
plt.title(‘Probabilidad de casos positivos’)
plt.grid(True)
plt.show()
# negativos
plt.hist(neg_probs, 50, facecolor=’blue’, alpha=0.5)
plt.xlabel(‘valores predecidos’)
plt.ylabel(‘Counts’)
plt.title(‘Probabilidad de casos negativoss’)
plt.grid(True)
plt.show()
Stochastic Outlier Selection on Apache Spark
ruta aquí
t-Distributed Stochastic Neighbor Embedding
ruta aquí
Espero le hayas gustado esta publicación sobre exploración y preprocesamiento usando Spark, ya comenzare con unas aplicaciones visión computaciónal y tensorflow
linkedin: