Dataflow ML as a Sequential Model Handler for Word Clustering

Aniket Agrawal
Google Cloud - Community
12 min readApr 12, 2023

Dataflow ML Pipeline: Words -> (Spacy + Sklearn) -> Clusters

In my previous blog, we explored how sentences are clustered using SpaCy, Dataflow ML, and BigQuery ML. In this blog as well, the vectorization and model handling strategies premised on Spacy and Dataflow ML remain somewhat similar.

Why these two products again? Because, as we’ll see, they again turn out to be so powerful and accurate in what they are supposed to do that I can’t resist the urge to play with them :)

So, what’s the difference? This time, the kernel selected for the Vertex AI workbench notebook is Apache Beam 2.46.0 rather than Python 3 (ipykernel) itself. So, this will ease the programmatic upload of output files to a GCS (Google Cloud Storage) bucket and pipeline export as a Dataflow job. Also, we will see the words clustered rather than the sentences in the very same Dataflow ML pipeline, and that too by using a different clustering tool and algorithm.

Our new guest is Scikit-learn (Sklearn), the library that kicked off my ML and NLP journeys long ago. We’ll rely on Sklearn Birch Clustering instead of BigQuery ML k-Means Clustering, indicating a transition from a partitioning-based to a hierarchical method of clustering. This is supposed to simplify the demonstration of sequential RunInference as well.

Sequential Model Handling in a Dataflow ML Pipeline

There are diverse handling patterns for models, notably ‘Sequence’ and ‘Branching’. We’ll see two models being handled in the sequence pattern via the ‘Beam RunInference API’ for SpaCy as well as Scikit-Learn. As per the below self-explanatory architecture diagram, we will use the Spacy and Sklearn models sequentially.

So, in the beam pipeline, the captured CSV file words are vectorized using SpaCy. Then, these vectors are clustered using Sklearn Birch clustering, after which the final findings are reported and visualized. At last, we’ll launch a Dataflow job from this pipeline created in the notebook.

Importance of Word Clustering

NLP is becoming more significant in today’s world as more businesses and organizations attempt to derive insights from text data. As a result, a slew of arguments support the significance of word clustering. Here are a few instances:

  1. Improved search engine accuracy
  2. Simplify document text analytics
  3. Quicker retrieval of significant information

Clustering Battle: Birch v/s K-Means

We previously discussed how k-means differs from its younger cousin, k-means++. Let’s take a high-level look at the differences between BIRCH and k-means clustering.

BIRCH (Balanced Iterative Reducing and Clustering using Hierarchies) creates a cluster hierarchy, beginning with a single cluster containing all of the data points. Then, clusters are split repeatedly until each cluster contains just one data point. Through a summary, it quickly identifies which data points are likely to belong to the same cluster. It swiftly determines which data points are likely to belong to the same cluster using a summary.

K-means is one of the most celebrated ML algorithms, frequently referred to as the ‘king of clustering’. It is partitional and separates the data points into a predefined number of clusters. It operates by allocating data points to clusters repeatedly and then recalculating cluster centres until they do not change.

Wait! Enough with the jargon—why choose BIRCH?

This technique operated quickly and accurately in a memory-efficient way on the dataset under examination. So it is likely to scale better than its counterparts. There is no hard-and-fast rule stating that a certain clustering method will always prevail. It heavily depends on your use case and the kind of dataset you are working with.

Practical Implementation

Please refer to this blog for other theoretical details. Without further ado, we’ll get started directly with the practical implementation.

1. Prerequisites

First things first! Perform these steps first before proceeding further to build and execute the Dataflow ML pipeline:

1.1. Create the required notebook

First, we’ll create an Apache Beam Vertex AI workbench notebook. For that, visit the ‘Dataflow’ section in the GCP console. Click on ‘Workbench’ followed by ‘User-managed notebooks’. Now, to create a new notebook, click on the ‘+’ sign followed by ‘Apache Beam’ and ‘Without GPUs’. The self-explanatory procedure is described visually below as well.

After specifying name, region, and network, click on ‘CREATE’ as per the below image.

Wait for 2-3 minutes for the ‘OPEN JUPYTERLAB’ option to come up.

Click on it when enabled and select ‘Apache Beam 2.46.0 for Python 3’ to the right. You are all set to proceed further!

1.2. Install the required packages

In the notebook, let’s begin by installing the required packages. As the kernel selected is Apache Beam rather than Python, the installation instructions will differ slightly.

# This is an Apache Beam kernel notebook, not a Python kernel one!
# Apache Beam is pre-installed; Sklearn is not!

%pip install -U spacy
%pip install scikit-learn

# %pip install --quiet apache-beam[gcp,interactive,dataframe]

Optional: Verify whether the packages have been successfully installed, and if so, check their version.

! python -m spacy validate # 'Validate' is provided by SpaCy

1.3. Enable the relevant APIs

Enable all the relevant APIs, such as Cloud Storage, Dataflow and Notebook. It can be done programmatically or via the GCP console.

Programmatic: The gcloud CLI provides a terminal-based command-line interface for accessing the exact same Google Cloud services that can be managed using the Cloud console. Prior to using this to enable the APIs in code, please check the list of available services first.

!gcloud services list

Then, run the following commands to enable the required APIs.

!gcloud services enable dataflow
!gcloud services enable notebooks.googleapis.com
!gcloud services enable storage-component.googleapis.com

GCP Console: After browsing the relevant APIs in the GCP console, click on ‘Enable’ as shown below for the Notebooks API on the left.

Repeat the steps for Dataflow as well as the Cloud Storage APIs. You should get ‘API Enabled’ for all of these, as shown in the above figure to the right.

1.4. Upload the required CSV files

Now, all you need are training and testing datasets containing a lot of relevant words. The sample files are supplied at the end for your convenience in testing. In total, 307 words belonging to four categories are taken into account, of which 111 and 196 are used for training and testing, respectively.

import pandas as pd
train_file = 'Final_Test_Data.csv' # Place 'Final_Train_Data.csv' also.
pd.read_csv(train_file, header=None, names=['Word'])

This code snippet prints 196 words used for testing. Similarly, 111 words used for training can be displayed.

2. Let’s play with the models

In this section, we’ll begin working with the SpaCy and Sklearn models.

2.1. Load the pre-trained SpaCy model

The largest English model of SpaCy, having a large word vector table (≈ 500k entries), is used. This time, because of a different kernel, the model can be easily installed using the SpaCy command-line interface. To vectorize words, load the model ‘nlp’ as follows:

# Remember, the kernel is 'Apache Beam',
# So, install the large English model as follows:

import spacy.cli
spacy.cli.download("en_core_web_lg")
nlp = spacy.load("en_core_web_lg")

This large English model essentially passes text through several phases like tokenization, part of speech tagging, named entity recognition, etc. to produce a 300-dimensional word embedding.

2.2. Train and Evaluate the Sklearn Clustering Model

The following code snippet creates a list of 111 word vectors created using the large model loaded in the previous step.

import csv 

docs = []

with open('Final_Train_Data.csv', mode ='r') as file:
csvFile = csv.reader(file)
for word in csvFile:
docs.append(nlp(word[0]))
print(len(docs))

The Sklearn clustering model is trained w.r.t. these 111 vectors belonging to four categories: wildlife, food, medical, and technology.

import numpy as np

final_data = []

for i, d in enumerate(docs):
arr = np.array(list(d.vector))
clusterarray_vectorized = arr.reshape(1, len(arr))
clusterarray_vectorized_mean = list(np.mean(clusterarray_vectorized, axis=0))
temp_row = np.asarray(clusterarray_vectorized_mean)
final_data.append(temp_row)

For this, a vector created for each word is converted into a numpy array, and in this manner, a 2-D matrix ‘final_data’ of dimension 111 x 300 is created for clustering.

from sklearn.cluster import Birch

X = np.asarray(final_data)

brc = Birch(n_clusters=4)
brc.fit(X)

brc.predict(X)

The prediction results for the training dataset indicate that words such as ‘Animals’, ‘Habitat’, and ‘Biodiversity’ are allocated cluster 0 for Wildlife, followed by ‘Chocolate’, ‘Fruits’, ‘Vegetables’ assigned cluster 3 for Food, ‘Epidemic’, ‘Fever’, ‘Flu’ are allotted cluster 1 for Medical and finally, ‘Computer’, ‘Smartphone’, ‘Tablet’ are assigned cluster 2 for Technology.

3. Working with Beam’s RunInference API

Now, for sequential model handling, we need to define and create the handlers.

3.1. Implement SpaCy model handler

Let’s again define a model handler class called ‘SpacyInfer’ to leverage spaCy for inference. In the function ‘run_inference’, the vector is inferred for each word and stored in the ‘infers’ list.

import apache_beam
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.base import ModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

class SpacyInfer(ModelHandler[str, spacy.Language, PredictionResult]):

def __init__(self, model_name):
self._model_name = model_name

def load_model(self):
return spacy.load(self._model_name)

def run_inference(self, sents, model, inference_args):
infers = []
for sent in sents:
doc = model(sent)
infers.append(doc.vector)
return infers

3.2. Implement the Scikit-learn model handler

Here, all the required libraries are imported for the Sklearn model to handle the Numpy arrays. The model object is saved (or ‘pickled’) by passing it into the dump() function of Pickle for serialization and conversion into a “byte stream”. The model.pkl file is further input to the model handler as the model URI.

import pickle
from apache_beam.ml.inference.sklearn_inference import ModelFileType
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy

clustering_file = 'sklearn_birch_clustering_model_final.pkl'

with open(clustering_file, 'wb') as f:
pickle.dump(brc, f)

sklearn_model_handler = SklearnModelHandlerNumpy(model_uri=clustering_file)

4. Run the Apache Beam Pipeline

It's time to build and execute the entire word clustering pipeline. Yay!

4.1. Create the Pipeline

Here, we’ll rely on InteractiveRunner() for iterative development and inspection of pipelines.

pipeline = apache_beam.Pipeline(InteractiveRunner())

4.2. Run the Pipeline

In the pipeline, the first words are extracted from the CSV file and vectorized using the SpaCy NLP model handler. Then, these vectors are clustered using the Sklearn model handler and finally, visualized.

with pipeline as p:
spacy_df = p | "FetchWords" >> apache_beam.io.ReadFromText(train_file)
spacy_df1 = spacy_df | "RunInferenceSpacy" >> RunInference(SpacyInfer("en_core_web_lg"))
sklearn_df = spacy_df1 | "RunInferenceSklearn" >> RunInference(model_handler=sklearn_model_handler)
sklearn_df1 = sklearn_df | "PrintClusters" >> apache_beam.Map(print)

4.3. Word-to-Cluster Assignment

The resultant word-cluster mapping could be easily visualized by concatenating the Pcollection outputs of ‘spacy_df’ (Words) and ‘sklearn_df’ (Vectors and Clusters).

word_cluster_pd = pd.concat([ib.collect(spacy_df), ib.collect(sklearn_df)], axis=1)
word_cluster_pd.set_axis(['Word', 'Vector', 'Cluster', 'Model_ID'], axis='columns', copy=False)
word_cluster_pd.style
Word-Vector-Cluster Mapping

5. Visualize the PCollection

Now, to visualize the pipeline, click on the ‘Interactive Beam’ tab and select the kernel for your notebook. This is how the pipeline will look.

Similarly, the outputs of ‘spacy_df’ and ‘spacy_df1’ can be visualized in the form of words and vectors. All the statistical parameters for the vectors, such as mean, median, standard deviation, etc., are captured as well. Remember to check the box corresponding to ‘Visualize in Facets’.

We can see how many out of 196 words are assigned to cluster 0, 1, 2, or 3.

PCollection Visualization: Outputs of sklearn_df for Clustering

This visualization shows word-to-cluster mapping. For example, the highlighted word ‘Google’ belongs to cluster 2 for Technology.

Cluster to Word Assignment by Position: The word ‘Google’ belongs to cluster 2 for Tech.

Note: By using the ‘apache_beam.runners.interactive.interactive_beam’ module, the above results can be generated. Our focus is to generate all of these using the ‘Interactive Beam’ tab to demonstrate its ease over the programmatic approach.

6. Launch the Dataflow Job

Finally, we’ll launch a Dataflow ML job from the clustering pipeline we just built. As is customary, let’s start with all of the necessary imports.

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner
import google.auth

Here, we set up the required pipeline options and GCP project. Also, we specify the region, the GCS staging location, and a temporary location for Cloud Dataflow to run, stage its binaries, and store its files before the output generation.

options = pipeline_options.PipelineOptions(flags={})
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = 'us-central1'
!gsutil mb gs://demo-bucket-aa-demo-dataflow
‘gsutil mb ‘ creates a multi-regional US bucket with standard class
gcs_path = 'gs://demo-bucket-aa-demo-dataflow/DataflowML'
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % gcs_path
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % gcs_path

Then, we can specify the directory to store the output files of the job.

output_path = '%s/output' % gcs_path
(sklearn_df | 'Writing clusters to Cloud Storage'
>> apache_beam.io.WriteToText(output_path + '/clusters.txt'))

By executing the below code snippet, we can see our Dataflow job up and running in the GCP console. Congrats! You did it.

from IPython.display import display, HTML

cluster_pipeline = DataflowRunner().run_pipeline(p, options=options)

url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' %
(cluster_pipeline._job.location, cluster_pipeline._job.id, cluster_pipeline._job.projectId))
display(HTML('To visualize your Dataflow job, please click <a href="%s" target="_new">here</a>. Enjoy!' % url))

cluster_pipeline.wait_until_finish()

A Python-based ‘gsutil’ application provides command-line access to GCS. You can verify the directory and the file content by executing the following commands:

!gsutil ls {gcs_path}
!gsutil ls {output_path}
!gsutil cat {output_path}/clusters.txt-00000-of-00001

The following screenshots display the job views and metrics:

Word Clustering Dataflow Job: Graph View
Word Clustering Dataflow Job: Table View
Job Metrics and Evaluation Details
Stage Workflow

Enjoy exploring and playing around with other options that the console has inbuilt for you.

Observation and Results

Out of 196 words, we can clearly observe that 40, 57, 41, and 58 words fall into clusters 0 (‘Wildlife’), 1 (‘Medical’), 2 (‘Tech’), and 3 (‘Food’), respectively. For such a small dataset, there are neither false positives nor false negatives anywhere, thereby maintaining clustering metrics such as precision and recall at 100%. Of course, as larger datasets are taken into account, the tug of war between precision and recall begins, and 100% accuracy is no longer guaranteed.

Challenges

In this blog, the focus was again on hard clustering rather than its fuzzy counterpart. This means that each word must belong to exactly one cluster. There are a lot of words you can think of that, semantically, can belong to more than one cluster. So be creative, and feel free to mention a few in the comments below. A few examples are shown below:

  1. Virus, Python -> Wildlife, Technical
  2. Meat, Fish, Chicken -> Wildlife, Food
  3. X-Ray -> Technical, Medical
  4. Oven -> Food, Technical
  5. Tablet -> Medical, Technical (Food also, as at least, it’s an eatable :)

There are other challenges as well. For a word to be accurately placed in the correct cluster, it has to be both vectorized and clustered correctly. Any pre-trained model, even the largest English model of SpaCy, cannot be expected to derive the correct semantic vector for each and every word literally. On top of this, the Sklearn model will also produce false negatives and positives from its end as well. So, both of these powerful Python libraries will participate in maintaining the final clustering accuracy.

Summary

In this blog, once again, we got to realize the key features of Dataflow ML, which defines the next generation of its counterpart, Dataflow. We observed NLP and ML models handled sequentially, with SpaCy and Sklearn collaborating to produce word vector clustering. At last, we visualized the entire pipeline data in the form of a Dataflow job after specifying the GCS location.

Stay curious, adios, and see you soon with another blog!

Note: Should you have any queries about this post or my notebook implementation, please feel free to connect with me on LinkedIn! Thanks!

My GCT Youtube Video links:

Dataflow ML for Word Clustering (Parts 1 and 2)

--

--

Aniket Agrawal
Google Cloud - Community

AI/ML | Cloud Engineer at Google, GenAI | Cybersecurity | ML | NLP | Image Processing Research Enthusiast https://www.linkedin.com/in/aniket-agrawal-a18990266/