Parallel Hyperparameter tuning using Snowpark

THIS STORY IS KIND OF USELESS NOW! Snowflake now has built-in Hyperparameter tuning with snowflake-ml-python! Check out how to master this here.

When training a model, data scientists often use many hyperparameter combinations. Although this can lead to a more accurate model, the downside to this method are longer run times. Rather than waiting for the model to tune, there has to be a better way within Snowflake to speed up this process. Check out the latest on optimization in my updated article here.

Well, now, with Snowpark for Python and User Defined Table Functions (UDTFs), we can train each combination of hyperparameters in parallel, tremendously speeding up the process while also cutting costs, and I am going to walk you through an example of how to do just that.

Using make_classification from scikit-learn we can generate a DataFrame with 2,000 records, six features, and a binary classification response variable.

Create a dataframe that has every possible combination of hyperparameters

In this case, we are going use six parameters, and with the different options 1x4x2x3x3x4 = 288 combinations. First, we will create the parameter dictionary.

param_grid = {
'BOOTSTRAP': [True],
'MAX_DEPTH': [80,90,100,110],
'MAX_FEATURES': [2,3],
'MIN_SAMPLES_LEAF': [3, 4, 5],
'MIN_SAMPLES_SPLIT': [8, 10, 12],
'N_ESTIMATORS': [100, 200, 300, 1000]
}

We need to create a dataframe that has every possible combination as a single row, as well as assigning a unique integer identifier to each param combination, which we will use to partition the data in our UDTF.

dfs = []
for k, v in param_grid.items():
df = pd.DataFrame(v, columns=[k])
dfs.append(df)

df = reduce(lambda left, right: pd.merge(left, right, how="cross"), dfs)
params_df = session.createDataFrame(df)

params_df = params_df.select(
"*", row_number().over(Window.order_by(lit(1))).as_("HP_ID")
)

We now have a Snowpark DataFrame with 288 rows, one for each combination of hyperparameters.

UDTFs take tables as input, and since we are partitioning on the HP ID, we need to have a copy of the training data for each combination of possible parameters. Hence we will cross join the data with the parameters to create the final dataframe to be used in the UDTF.

Creating the UDTF for model training

When training models in snowflake we often do this in pandas within the stored procedure, however if you want to train multiple models in parallel and make use of all available nodes in a warehouse then we can go one level further, and push the model training into a UDTF.

When creating a UDTF, the first step is to define your output schema structure. Think about what we need; we want to know the optimal combination of hyperparameters and a metric to gauge this decision. In this situation, we will use accuracy, so the result of our UDTF is our six parameters in addition to the accuracy.

schema = StructType(
[
StructField("BOOTSTRAP1", BooleanType()),
StructField("MAX_DEPTH1", IntegerType()),
StructField("MAX_FEATURES1", IntegerType()),
StructField("MIN_SAMPLES_SPLIT1", IntegerType()),
StructField("MIN_SAMPLES_LEAF1", IntegerType()),
StructField("N_ESTIMATORS1", IntegerType()),
StructField("Accuracy", FloatType())
]
)

Next we will create our UDTF

Assign output_schema to the schema we just created, followed by Input_types, which will be the hyperparameter types followed by the data types. Next, name the UDTF, make it permanent, give it a stage location to store the UDTF, define the packages used within it, and replace it if the UDTF already exists.

@udtf(
output_schema=schema,
input_types=[
BooleanType(),
IntegerType(),
IntegerType(),
IntegerType(),
IntegerType(),
IntegerType(),
FloatType(),
FloatType(),
FloatType(),
FloatType(),
FloatType(),
FloatType(),
IntegerType(),
],
name="hyperparameter_tuning",
is_permanent=True,
stage_location="@pymodels",
packages=["snowflake-snowpark-python", "pandas", "scikit-learn"],
replace=True,
)

Init— Initializes state for stateful processing of input partitions. For more information, see Initializing the Handler in this topic.

class forecast:
def __init__(self):
self.BOOTSTRAP = None
self.MAX_DEPTH = None
self.MAX_FEATURES = None
self.MIN_SAMPLES_SPLIT = None
self.MIN_SAMPLES_LEAF = None
self.N_ESTIMATORS = None
self.x1 = []
self.x2 = []
self.x3 = []
self.x4 = []
self.x5 = []
self.x6 = []
self.y = []
self.processedFirstRow = False

Process — Processes each input row, returning a tabular value as tuples. Snowflake invokes this method, passing input from the UDTF’s arguments. For more information, see Defining a process Method in this topic.

  def process(
self,
BOOTSTRAP,
MAX_DEPTH,
MAX_FEATURES,
MIN_SAMPLES_SPLIT,
MIN_SAMPLES_LEAF,
N_ESTIMATORS,
X1,
X2,
X3,
X4,
X5,
X6,
Y,
):

If not self.processedFirstRow — We are telling the UDTF only to run 288 models based on the unique combination of hyperparameters rather than every record in our data set.

            if not self.processedFirstRow:
self.BOOTSTRAP = BOOTSTRAP
self.MAX_DEPTH = MAX_DEPTH
self.MAX_FEATURES = MAX_FEATURES
self.MIN_SAMPLES_SPLIT = MIN_SAMPLES_SPLIT
self.MIN_SAMPLES_LEAF = MIN_SAMPLES_LEAF
self.N_ESTIMATORS = N_ESTIMATORS
self.processedFirstRow = True
self.x1.append(X1)
self.x2.append(X2)
self.x3.append(X3)
self.x4.append(X4)
self.x5.append(X5)
self.x6.append(X6)
self.y.append(Y)

End Partition — Finalizes processing of input partitions, returning a tabular value as tuples. For more information, see Finalizing Partition Processing in this topic.

Together the process is used to add the rows of a partition into the x1, x2 … lists, and then end_partition is called when all rows have been processed, and we then can train the model based on the hyperparameters in the first row.

    def end_partition(self):
df = pd.DataFrame(
zip(self.x1, self.x2, self.x3, self.x4, self.x5, self.x6, self.y),
columns=["X1", "X2", "X3", "X4", "X5", "X6", "Y"],
)

dfx = df.loc[:, df.columns != "Y"]
dfy = df.loc[:, df.columns == "Y"]

X_train, X_test, y_train, y_test = train_test_split(
dfx, dfy, test_size=0.2, random_state=0
)

clf = RandomForestClassifier(
bootstrap=self.BOOTSTRAP,
max_depth=self.MAX_DEPTH,
max_features=self.MAX_FEATURES,
min_samples_split=self.MIN_SAMPLES_SPLIT,
min_samples_leaf=self.MIN_SAMPLES_LEAF,
n_estimators=self.N_ESTIMATORS,
n_jobs=1,
)

clf.fit(X_train, y_train.values.ravel())

y_pred = clf.predict(X_test)

yield (
self.BOOTSTRAP,
self.MAX_DEPTH,
self.MAX_FEATURES,
self.MIN_SAMPLES_SPLIT,
self.MIN_SAMPLES_LEAF,
self.N_ESTIMATORS,
accuracy_score(y_test, y_pred),
)

Once the UDTF is created, we can run it against the table with all our data and hyperparameters.

session.use_database('dev')
session.use_schema('model_dev')

HP_TUNING = F.table_function("hyperparameter_tuning")
forecast = df.select(
df["HP_ID"],
(
HP_TUNING(
df["BOOTSTRAP"],
df["MAX_DEPTH"],
df["MAX_FEATURES"],
df["MIN_SAMPLES_SPLIT"],
df["MIN_SAMPLES_LEAF"],
df["N_ESTIMATORS"],
df["X1"],
df["X2"],
df["X3"],
df["X4"],
df["X5"],
df["X6"],
df["Y"]
).over(partition_by=df["HP_ID"])
)
).sort(col('Accuracy').desc())
forecast.show()

After running the UDTF, we can see that we have our most optimal combination of hyperparameters which yield an accuracy of .995, but with having to run 288 models, how long did this take, and how much is this going to cost? Since the UDTF can run our models in parallel, we ran all 288 models in 1 minute and 16 seconds using a medium warehouse costing only .021 credits. The medium warehouse used for compute was suspended and only took 9 seconds to auto-resume and 1 minute and 7 seconds to execute the UDTF, as shown in the Profile Overview. In our Statistics overview, we see 576,000 rows were made up of our data and hyperparameters (2,000 records * 288 hyperparameters), yet we only executed 288 unique models from the combination of hyperparameters.

There is always a risk of data explosion when we have a lot of training data, i.e., with 2,000 rows and 288 parameters, you saw we had 576,000 rows, but if you were to have 200,000 rows, you would now have 57,600,000 rows, and 2,000,000 will give you 576,000,000 and so on. In that case, we might need to do sampling first or use Snowpark-optimized warehouse with more memory.

Conclusion

UDTFs are a fantastic way to parallelize your model training in Snowflake to cut down on time and cost. We can use Snowflake warehouses to scale up when we have a larger number of models that need to be trained, and take advantage of auto-resume and auto-suspend to remove the manual intervention required to manage compute clusters. To learn more about Snowpark visit https://www.snowflake.com/snowpark/.

You can check out the full notebook in Hex on how to re-create this scenario in your Snowflake account below!

https://app.hex.tech/snowflake/app/26af5cca-68c9-4d23-84d0-2e79af5159f6/latest

--

--