34 Things I Wished I Knew Before My Databricks ML Associate Exam

Daniel Ruiz
20 min readDec 1, 2023

Databricks is one of the most powerful data platforms out there. It combines a Data Platform and an AI Platform into the extremely powerful Databricks Data Intelligence Platform, that enables organizations to capture the true potential of their data and their data teams.

To enable users to make the most of its countless features, Databricks has opened free overview courses to partners and customers in the Databricks Academy. These courses are linked to paid certifications targeting different roles, including for ML professionals. These courses and certification paths are a great way of learning more about Databricks.

There are two relevant Databricks certifications for Machine Learning:

  • Machine Learning Associate
  • Machine Learning Professional

Often, these are pursued by Machine Learning Engineers (of all flavours including MLOPS) and Data Scientists that want to prove their Databricks expertise, and acquire a more comprehensive view of what the platform offers.

This was the case for me. I am a Machine Learning Engineer. While preparing for the Machine Learning Associate certification, I noticed the lack of good public content out there to prepare candidates for the certification. I’ve passed the certification, and written this article to help you conquer your badge too.

This article covers relevant topics for the Associate certification. This is not an introduction to Machine Learning or Databricks; rather, this is a topic review for the Databricks Machine Learning Associate certification. Be sharp on the following topics before taking the exam.

The Exam

To obtain the Databricks Machine Learning Associate certification, you need to pass an online-proctored exam. The certification is intended for professionals with at least six-month experience in the platform, and it’s valid for 2 years.

In under 90 minutes, you will need to achieved a 70% mark on 45 questions. Some of these questions focus on code and the Databricks platform, some on modelling and theoretical Machine Learning concepts.

The exam is only available in English.

It is divided in four sections with the following weights and topics:

  1. Databricks Machine Learning (29%): Databricks ML, Databricks Runtime for Machine Learning, AutoML, Feature Store, Managed MLflow
  2. ML Workflows (29%): Exploratory Data Analysis, Feature Engineering, Training, Evaluation and Selection
  3. Spark ML (33%): Distributed ML Concepts, Spark ML Modelling APIs, Hyperopt, Pandas API on Spark, Pandas UDFs/Function APIs
  4. Scaling ML Models (9%): Model Distribution, Ensembling Distribution

More details are available in the official Exam Guide.

Databricks Machine Learning

1. How to integrate Git and Databricks to deliver CI/CD?

Databricks provides the following overview on how to integrate Git repos with Databricks Repos, and hence provide source control for project files.

2. When is a single-node cluster preferred?

Databricks clusters may be multi-node or single-node.

Single Node clusters are intended for jobs that use small amounts of data or non-distributed workloads such as single-node machine learning libraries.

Multi Node clusters are for larger jobs with distributed workloads.

3. Which libraries are included in Databricks Runtime ML?

The Databricks Runtime is the set of core components that run on compute clusters, and it needs to be selected when creating the cluster. It comes in different versions.

There is a specialized runtime for Machine Learning. These have an ML suffix (e.g. Databricks Runtime 14.1 ML), and include include a series of relevant libraries. Databricks designates a few of these libraries as top tier, and these are updated to the latest package available at every new runtime (if there are no dependency conflicts).

The Databricks Runtime 14.1 ML includes the following top-tier libraries:

4. How to install additional libraries to a cluster?

Databricks lists seven different ways of installing a library to a cluster.

  • Install a library for use with a specific cluster only.
  • Install a workspace library that has previously been defined in the workspace.
  • Install a library with the REST API.
  • Install a library with the Databricks CLI.
  • Install a library using Databricks Terraform.
  • Install a library by creating a cluster with a policy that defines library installations.
  • Install a library using an init script that runs at cluster creation time (not recommended).

The current preferred solution is using cluster policies. The init script, though widely used in industry, is no longer recommended by Databricks. Indeed, Databricks recommends replacing installation via init scripts with cluster policies.

5. What can Databricks AutoML do?

Databricks AutoML enables Data Scientists to quickly generate baseline models and notebooks with a low-code approach. Then, citizen data scientists may use their domain knowledge to customize and improve these baselines.

More pragmatically, it will perform the following machine learning tasks given an input dataset and target column:

1. AutoML prepares the dataset for model training.

2. AutoML then performs and records a set of trials that creates, tunes, and evaluates multiple models.

3. AutoML displays the results and provides a Python notebook with the source code for each trial run so you can review, reproduce, and modify the code.

4. AutoML also calculates summary statistics on your dataset and saves this information in a notebook that you can review later.

6. What evaluation metrics are optimized by Databricks AutoML?

Databricks AutoML accepts three use-cases: classification, regression, and forecasting. Each use-case accepts a set of evaluation metrics.

  • Regression: r2 (default), mae, rmse, or mse.
  • Classification: f1 (default), log_loss, precision, accuracy, or roc_auc.
  • Forecasting: smape (default), mse, rmse, mae, or mdape.

In the example below, AutoML is used programmatically to run a regression that minimizes the root mean squared error:

from databricks import automl

summary = automl.regress(train_df
, target_col="price"
, primary_metric="rmse"
, timeout_minutes=5
, max_trials=10)

print(summary.best_trial)

In this second example, AutoML maximizes the f1-score of a classification problem:

from databricks import automl

summary = automl.classify(train_df
, target_col="income"
, primary_metric="f1"
, timeout_minutes=30)

print(summary.best_trial)

7. How to orchestrate multi-task Databricks jobs?

Databricks Workflow may execute jobs and tasks either in parallel or sequentially by leveraging dependencies.

  • A Databricks job may be used to run data processing and analysis applications in a Databricks workspace.
  • Jobs are composed of one or more tasks (i.e. multi-task).
  • A task may do different things, such as run notebooks, JARS, Delta Live Tables pipelines, or Python, Scala, Spark submit, and Java applications.
  • Tasks may have dependencies; these may be used to determine in which order the notebooks will be executed. For example, a job comprised of two tasks — each running a different Databricks notebook — may have these notebooks executed parallelly or sequentially, depending on how the dependencies are configured. This is made clear by the example below:

8. What is a Feature Store?

According to Databricks:

A feature store is a centralized repository that enables data scientists to find and share features and also ensures that the same code used to compute the feature values is used for model training and inference.

Unit Catalog becomes your feature store if (i) Unity Catalog has been enabled and (ii) runtime is 13.2 or above. In summary:

For workspaces that are enabled for Unity Catalog, write the DataFrame as a feature table in Unity Catalog.

If your workspace is not enabled for Unity Catalog, write the DataFrame as a feature table in the Workspace Feature Store.

To create a feature table in a feature store:

from databricks.feature_store import feature_table

def compute_customer_features(data):
''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
pass

# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient

customer_features_df = compute_customer_features(df)

fs = FeatureStoreClient()

customer_feature_table = fs.create_table(
name='recommender_system.customer_features',
primary_keys='customer_id',
schema=customer_features_df.schema,
description='Customer features'
)

# An alternative is to use `create_table` and specify the `df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fs.create_table(
# ...
# df=customer_features_df,
# ...
# )

# To use a composite key, pass all keys in the create_table call

# customer_feature_table = fs.create_table(
# ...
# primary_keys=['customer_id', 'date'],
# ...
# )

To register an existing delta table as a feature table:

fs.register_table(
delta_table='recommender.customer_features',
primary_keys='customer_id',
description='Customer features'
)

To write data to an existing table:

# Use write_table to write data to the feature table
# overwrite mode does a full refresh of the feature table
# merge mode updates only specific rows in a feature table
fs.write_table(
name='recommender_system.customer_features',
df = customer_features_df,
mode = 'overwrite'
)

To read from a feature table:

fs = feature_store.FeatureStoreClient()
customer_features_df = fs.read_table(name='recommender.customer_features')

Finally, to get its metadata:

# this example works with v0.3.6 and above
# for v0.3.5, use `get_feature_table`
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
fs.get_table("feature_store_example.user_feature_table")

9. How to take a feature from Feature Store?

Assuming the Feature Store has been created correctly, it’s necessary to create a Feature Store client and do a feature lookup.

In a simple example:

from databricks.feature_store import FeatureLookup, FeatureStoreClient

feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]

fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)

training_df = training_set.load_df()

In a more developed example:

from databricks.feature_store import FeatureLookup, FeatureStoreClient

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

If feature_names is not specified in the FeatureLookup, then all features except primary keys will be taken.

This notebook provides an good implementation of Feature Stores, and also shows how to do batch scoring.

10. How to log an artifact using MLflow?

As stated in MLflow documentation:

The MLflow Tracking component is an API and UI for logging parameters, code versions, metrics, and output files when running your machine learning code and for later visualizing the results.

The log_artifact function may be used to log artifacts such as a DataFrame in the model folder. The function accepts the following parameters:

mlflow.log_artifact(local_path: str
, artifact_path: Optional[str] = None
) → None

Three important takeaways are:

  • There is only one required parameter (local_path), and it is a path that points to a file.
  • If instead a folder is to be logged, the function log_artifacts should be used.
  • The optional parameter artifact_path points to a folder in the artifact_uri where to write the artifact to.

The following snippet illustrates usage in a simple case:

import mlflow

# Create a features.txt artifact file
features = "rooms, zipcode, median_price, school_rating, transport"
with open("features.txt", "w") as f:
f.write(features)

# With artifact_path=None write features.txt under
# root artifact_uri/artifacts directory
with mlflow.start_run():
mlflow.log_artifact("features.txt")

11. How to get the best run of a model using MLflow?

It’s possible to use the MLflow Tracking API to search runs, and conveniently return results in the form of a DataFrame.

What does best mean? In this context, it will be the run that optimizes the metric specified. For example: what’s the run that minimizes the mean average error (MAE)?

runs = mlflow.search_runs(experiment_ids=experiment_id,
order_by=['metrics.mae'],
max_results=1)
runs.loc[0]

ML Workflows

12. How to filter a Spark DataFrame on a given column?

GeeksForGeeks lists different ways of filtering Spark DataFrames using the filter method.

Using filter() alone:

dataframe.filter(dataframe.college == 'DU')

Or, combining filter() with SQL col():

dataframe.filter(col('college') == 'DU')

The where method is also handy.

dataframe.where(dataframe.college == 'DU') 

Also with col():

dataframe.where(col('college') == 'DU')

A couple extra points:

  • If used altogether with inequality signs, these methods may be used to remove outliers from your dataset.
  • Using col() is generally preferred to not using it.

13. How to use dbutils.data.summarize?

Microsoft explains that the summarize command calculates and displays summary statistics of an Apache Spark DataFrame or pandas DataFrame.

This command is available for Python, Scala and R.

This command may receive the precise parameter (default is False).

  • When set to False, execution is faster, but that comes at the expense of a higher relative error.
  • When set to True, all statistics except for the histograms and percentiles for numeric columns are exact. The histograms and percentile estimates may have an error of up to 0.0001% relative to the total number of rows.

In Python:

df = spark.read
.format('csv')
.load('/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv', header=True, inferSchema=True)

dbutils.data.summarize(df)

In Spark:

val df = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")

dbutils.data.summarize(df)

In R:

df <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true")

dbutils.data.summarize(df)

14. How to use summary?

The summary command is also quite handy. It provides the basic statistics from a Spark DataFrame.

df.summary().show()

+-------+------------------+-----+
|summary| age| name|
+-------+------------------+-----+
| count| 2| 2|
| mean| 3.5| null|
| stddev|2.1213203435596424| null|
| min| 2|Alice|
| 25%| 2| null|
| 50%| 2| null|
| 75%| 5| null|
| max| 5| Bob|
+-------+------------------+-----+

15. How to input missing values?

The imputer class may be used to input missing values using either the mean, median, or mode of a variable. The imputer does not support categorical features, only numeric features. It’s still possible to input categorial variables, but first it needs be converted to a numerical label.

The mean and median are frequently used to input missing numeric values, and the mode is used to input categorical values that have been converted to numeric. By default, the mean is used as imputation strategy.

For example:

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([(1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0),
(4.0, 4.0), (5.0, 5.0)], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"],
outputCols=["out_a", "out_b"],
strategy="mean")

model = imputer.fit(df)
model.transform(df)

One disclaimer: before inputting a missing value, it’s important to evaluate whether this is a good idea from a modelling perspective. Maybe, the model should see the missing values and learn from them.

16. How to deal with missing data in tree algorithms?

As IBM explains, Decision Trees require little to no data preparation. They handle both discrete and continuous variables, and they may also handle variables with missing values.

Databricks generally favours letting missing values be missing, and let tree algorithms figure their way around them.

17. How to index and one hot encode categorical variables?

Usually, categorical variables come as string. If that’s the case, the first step is to use the StringIndexer to represent this feature as numerical.

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
[(0, “a”), (1, “b”), (2, “c”), (3, “a”), (4, “a”), (5, “c”)],
[“id”, “category”])

indexer = StringIndexer(inputCol=”category”, outputCol=”categoryIndex”)
indexed = indexer.fit(df).transform(df)
indexed.show()

Then, the second step is to apply the OneHotEncoder

from pyspark.ml.feature import OneHotEncoder:
encoder = OneHotEncoder(inputCols=[“categoryIndex”], outputCols=[“categoryVec”])

model = encoder.fit(indexed)
encoded = model.transform(indexed)
encoded.show()

18. How to use the VectorAssembler?

According to Apache Spark documentation:

[The] VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

This is illustrated in the example below:

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
[“id”, “hour”, “mobile”, “userFeatures”, “clicked”])

assembler = VectorAssembler(
inputCols=[“hour”, “mobile”, “userFeatures”],
outputCol=”features”)

output = assembler.transform(dataset)
print(“Assembled columns ‘hour’, ‘mobile’, ‘userFeatures’ to vector column ‘features’”)
output.select(“features”, “clicked”).show(truncate=False)

# id | hour | mobile | userFeatures | clicked | features
# — — | — — — | — — — — | — — — — — — — — — | — — — — -| — — — — — — — — — — — — — — -
# 0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]

19. How do you interpret level and logs in regressions?

A few seemingly innocent math operations can change the interpretation of the coefficients and errors of a regression. The logarithm is one of such operations.

The logarithm may be applied at the independent and/or the dependent variables. That will determine if the regression is level-level, level-log, log-level, or log-log.

UCLA gives insight of what happens when the dependent variable is changed:

when the outcome variable is log transformed, it is natural to interpret the exponentiated regression coefficients. These values correspond to changes in the ratio of the expected geometric means of the original outcome variable.

So, the dependent variable interpretation becomes akin to a percentage change. If the interpretation of the dependent variable changes, a direct comparison of different regressions is comparing apples to oranges.

In order to compare different regressions, first make sure that the predicted values (or any metric derived from them) compared reflect the same units. In other words, either bring the different regression outputs to level or to log before you calculate the metrics and make comparisons.

20. How to evaluate a regression model?

It’s possible to build a RegressionEvaluator object and apply it to a dataframe that contains the predictions and actual values. For example:

# dummy data
scoreAndLabels = [(-28.98343821, -27.0), (20.21491975, 21.5),
(-25.98418959, -22.0), (30.69731842, 33.0), (74.69283752, 71.0)]
dataset = spark.createDataFrame(scoreAndLabels, ["raw", "label"])

# instantiate
evaluator = RegressionEvaluator()
evaluator.setPredictionCol("raw")

# evaluate
evaluator.evaluate(dataset)
evaluator.evaluate(dataset, {evaluator.metricName: "r2"})
evaluator.evaluate(dataset, {evaluator.metricName: "mae"})

Optionally, a weight column may be passed as a parameter.

21. How to perform cross-validation when fitting a model?

The CrossValidator object may be used to let the model be trained with cross-validation. It receives the estimator and other parameters, and then it is used to fit the training data. This is illustrated in the example below.

# import
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
import tempfile

# dummy data
dataset = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])

# instantiate
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()

# instantiate
cv = CrossValidator(estimator=lr
, estimatorParamMaps=grid
, evaluator=evaluator
, parallelism=2)

# fit
cvModel = cv.fit(dataset)
cvModel.getNumFolds()
cvModel.avgMetrics[0]

# save
path = tempfile.mkdtemp()
model_path = path + "/model"
cvModel.write().save(model_path)

# read
cvModelRead = CrossValidatorModel.read().load(model_path)
cvModelRead.avgMetrics

# evaluate
evaluator.evaluate(cvModel.transform(dataset))
evaluator.evaluate(cvModelRead.transform(dataset))

22. How to evaluate a binary classifier?

Binary classifiers assign True or False values (i.e. positive and negative, or two classes, hence the name binary). The four most important classification metrics are:

  • Accuracy measures how many values -either positive or negative- have been correctly predicted, as a percentage of the total values.
  • Precision measures how many positive values are actually true, as a percentage of total values predicted. In a corner case, if a model only predicts one observation as positive, and it’s actually true, then precision equals 1, even if there are many false negatives.
  • Recall measures how many positives were predicted, as a percentage of the total true values in the data. In a corner case, if a model predicts that all observations are positive, then recall equals 1, even if there are many false positives.
  • F1-Score is the harmonic mean of precision and recall. It is used to balance them, and it may be adjusted with a custom weight.

23. How to orchestrate runs with MLflow?

The basic syntax is for starting a run is:

import mlflow

mlflow.start_run()
run = mlflow.active_run()
print(f”Active run_id: {run.info.run_id}”)
mlflow.end_run()

Using with, it becomes easier to call a run, and creating child runs is done my merely adding setting nested to True.

import mlflow

# Create nested runs
with mlflow.start_run():
with mlflow.start_run(nested=True) as child_run:
child_run_id = child_run.info.run_id

parent_run = mlflow.get_parent_run(child_run_id)
print(f”child_run_id: {child_run_id}”)
print(f”parent_run_id: {parent_run.info.run_id}”)

Spark ML

24. What is an estimator?

Scikit-Learn provides a fine definition of what an estimator it:

an object that fits a model based on some training data and is capable of inferring some properties on new data. It can be, for instance, a classifier or a regressor. All estimators implement the fit method.

Spark has a shorter description, albeit quite similar. Estimators fit models to data.

A general example:

estimator.fit(X, y)

A more concrete example:

SVC(C=100).fit(X_train, y_train)

25. How may scikit-learn and Spark deliver different models if the same data and parameters are used?

The implementation of the underlying algorithms may differ, especially the methods used to reach convergence.

In Spark, optimization is accelerated using stochastic methods:

linear methods use convex optimization methods to optimize the objective functions. spark.mllib uses two methods, SGD and L-BFGS, described in the optimization section. Currently, most algorithm APIs support Stochastic Gradient Descent (SGD), and a few support L-BFGS.

In scikit-learn, different algorithms are used, relying on matrix inversion rather than stochastics. Plus, parallelism does not come by default, and should be configured with support libraries such as joblib.

These underlying differences may lead to different results if your data has local optimum.

26. What are the hardships of parallelization?

KDNuggets highlights that:

Parallelism is a good idea when the task can be divided into sub-tasks that can be executed independent of each other without communication or shared resources.

However,

[in] real-life, most of the programs have some sections that need to be executed in serialized fashion, and the parallelizable sub-tasks need some kind of synchronization or data transfer.

In other words, parallelism becomes less relevant if tasks either (i) are built sequentially or (ii) recursively require each other’s outputs.

27. How to optimize with Hyperopt?

The function fmin is used to optimize a Hyperopt run.

Please notice, there is no fmax. If the statistical goal is to maximize a metric, then the objective function should be adapted; instead, the negative of such metric should be minimized.

The function fmin requires two parameters: fn and space.

  • fn is the objective function being minimzed
  • space is the set of possible arguments that fn will take.

There are a few other good-to-know optional parameters:

  • algo defines the search algorithm. The most commonly used are hyperopt.rand.suggest for Random Search and hyperopt.tpe.suggest for TPE.
  • max_evals defines the maximum number of evaluations allowed.
  • trials defines the storage for completed, ongoing, and scheduled evaluation points. Use SparkTrials for scikit-learn, and Trials for Hyperopt.

As Databricks explains:

SparkTrials is designed to parallelize computations for single-machine ML models such as scikit-learn. For models created with distributed ML algorithms such as MLlib or Horovod, do not use SparkTrials. In this case the model building process is automatically parallelized on the cluster and you should use the default Hyperopt class Trials.

So, the usage of fmin can be illustrated by:

import pickle
import time
from hyperopt import fmin, tpe, hp, STATUS_OK

def objective(x):
return {'loss': x ** 2, 'status': STATUS_OK }

best = fmin(objective,
space=hp.uniform('x', -10, 10),
algo=tpe.suggest,
max_evals=100)

print(best)

Finally, Hyperopt:

import hyperopt

best_hyperparameters = hyperopt.fmin(
fn = training_function,
space = search_space,
algo = hyperopt.tpe.suggest,
max_evals = 64,
trials = hyperopt.SparkTrials())

28. How to convert Pandas to Spark and back?

It is possible to actually convert a Pandas DataFrame into a Spark DataFrame, and vice-versa. This is possible with Apache Arrow, an in-memory columnar data format used in Apache Spark to efficiently transfer data between JVM and Python processes.

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a pandas DataFrame using Arrow
sdf = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a pandas DataFrame using Arrow
result_pdf = sdf.select("*").toPandas()

29. What is the Pandas API on Spark?

The Pandas API on Spark is a means of using a Spark DataFrame as if it were a Pandas DataFrame. Databricks explains why and how:

Commonly used by data scientists, pandas is a Python package that provides easy-to-use data structures and data analysis tools for the Python programming language. However, pandas does not scale out to big data. Pandas API on Spark fills this gap by providing pandas equivalent APIs that work on Apache Spark. Pandas API on Spark is useful not only for pandas users but also PySpark users, because pandas API on Spark supports many tasks that are difficult to do with PySpark, for example plotting data directly from a PySpark DataFrame.

The following method may be used to create a a pandas-on-Spark DataFrame (psdf) by passing a pandas DataFrame (pdf):

import pyspark.pandas as ps

psdf = ps.DataFrame(pdf)

# alternatively:
psdf = ps.from_pandas(pdf)

30. What is the Pandas UDF?

Microsoft explains that:

A pandas user-defined function (UDF) — also known as vectorized UDF — is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.

There are four UDFs:

  • Series to Series UDF
  • Iterator of Series to Iterator of Series UDF
  • Iterator of multiple Series to Iterator of Series UDF
  • Series to scalar UDF

Manning provides a good decision tree to decide which UDF to use:

31. How to implement a Series to Series Pandas UDF?

The Series to Series Pandas UDF can vectorize scalar operations with either select or withColumn. It is the most basic UDF. It is illustrated in the following snippet.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

# 0 1
# 1 4
# 2 9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()

# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+

32. When to use ApplyInPandas and MapInPandas?

Microsoft Documentation on Databricks details that there are three types of pandas function APIs:

  • Grouped map
  • Map
  • Cogrouped map

In grouped map, groupBy().applyInPandas() implements a split-apply-combine pattern that will:

(1) Split the data into groups by using DataFrame.groupBy.

(2) Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.

(3) Combine the results into a new DataFrame.

For example:

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id")
.applyInPandas(subtract_mean, schema="id long, v double")
.show()

# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+

In map, DataFrame.mapInPandas() performs map operations transforming a pandas dataframe iterator and returns a returns the result as a PySpark DataFrame.

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+

In cogrouped map, DataFrame.groupby().cogroup().applyInPandas() is used to cogroup two PySpark DataFrames by a common key and then apply a Python function to each cogroup.

import pandas as pd

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))

def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()

# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+

Scaling ML Models

33. What are ensemble methods?

The three most famous ensemble methods are bagging, boosting, and stacking. As the name ensemble suggests, they try to combine multiple models to achieve a combined model that’s better performing.

These techniques may be described as:

  • Bagging trains homogeneous weak-learners in parallel, and then combines them with some deterministic averaging process. E.g., average out different forecasts.
  • Boosting: trains homogeneous weak-learners sequentially, and combines them following a deterministic strategy. E.g., train on the residuals of the previous model.
  • Stacking trains heterogenous weak-learners in parallel, and then combines them by training a meta-model. E.g., train a model on top of other models.

34. What is Gradient Boosting?

According to Machine Learning Mastery, Gradient Boosting is one of the most powerful techniques out there to build predictive models. It consists of three elements:

1. A loss function to be optimized.

2. A weak learner to make predictions.

3. An additive model to add weak learners to minimize the loss function.

It is possible to improve Gradient Boosting by using bagging on random forests trained with data subsamples. This more advanced technique is called Stochastic Gradient Boosting.

Good luck!

Enroll now for the Databricks Machine Learning Associate certification, and go get your badge!

Good luck on your exam!

If you found this article useful, please like, comment, share, or buy me a coffee.

--

--