Taken from pixabay.

Topic Modelling with PySpark and Spark NLP

Maria Obedkova
TrustYou Engineering
10 min readMay 29, 2020

--

In recent years, the constant growth of data and the increased popularity of Natural Language Processing gave rise to a lot of big data analysis and NLP tools. Many libraries for various programming languages emerged lately and came in handy for NLP and big data analysis.

One of such big data tools is Spark — a framework for large-scale data processing which is available for many programming languages, including Python. PySpark is a well-maintained Python package for Spark that allows to perform exploratory data analysis and build machine learning pipelines for big data.

A large amount of data is also relevant to NLP field, and Spark NLP library brings the two worlds of big data and NLP together. Spark NLP offers extensive functionality for various NLP tasks and the possibility to process them fast and efficiently with Spark. The Spark NLP library provides many different pipelines and models for multiple languages and comes with an open-source implementation and elaborated documentation.

In this blog post, I would like to touch upon a well-known NLP task of Topic Modelling with application in big data. Topic Modelling is a statistical approach for data modelling that helps in discovering underlying topics that are present in the collection of documents. Even though Spark NLP is a great library for various NLP tasks, they don’t have topic modelling pipeline provided. Hence, I would like to go through how you can implement the topic modelling pipeline using PySpark and Spark NLP.

Let’s start

We need to have PySpark and Spark NLP already installed. If you don’t have them yet, please refer to PySpark and Spark NLP documentations. If you would like to experiment in Google Collaboratory, you can check out this complementary repository.

First of all, we need to start the Spark session. We can do it directly with PySpark or through Spark NLP as in the following example:

import sparknlpspark = sparknlp.start()

1. Data

For our task, we will use the open-source data from Kaggle Amazon Musical Instruments Reviews. The data is not big but is sufficient for the tutorial’s sake. There is a lot of information in the data, but we are primarily interested in the text of reviews for topic modelling. We will read the data with PySpark, select a column of our interest and get rid of empty reviews in the data.

from pyspark.sql import functions as Fpath = 'Musical_instruments_reviews.csv'
data = spark.read.csv(path, header=True)
text_col = 'reviewText'
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

Your data at this point will look as follows:

+--------------------------------------------------+ 
| reviewText|
+--------------------------------------------------+
| that's just like|
|The product does exactly as it should and is qu...|
|The primary job of this device is to block the ...|
|Nice windscreen protects my MXL mic and prevent...|
|This pop filter is great. It looks and performs...|
+--------------------------------------------------+

2. NLP pipeline with Spark NLP

Once we have the data at hand, let’s move on to build an NLP pipeline. I already mentioned that Spark NLP has extensive NLP functionality, and now it is time to apply it to preprocess the text for topic modelling purposes.

Conceptually, Spark NLP is similar to many other machine learning libraries since it consists of two major components such as Estimators (estimate data and train models) and Transformers (transform data and apply models) which are defined as Annotators. They, in turn, make up your Pipelines.

Most Spark NLP annotators work only with a particular annotation format, namely, they take in and output data in the following form:

[annotatorType, begin, end, result, metadata, embeddings]

For example, for Tokenizer in Spark NLP, the output for the sentence “Hello, this is an example sentence” would look like:

[[token, 0, 4, Hello, [sentence -> 0], [], []], 
[token, 5, 5, ,, [sentence -> 0], [], []],
[token, 7, 10, this, [sentence -> 0], [], []],
[token, 12, 13, is, [sentence -> 0], [], []],
[token, 15, 16, an, [sentence -> 0], [], []],
[token, 18, 24, example, [sentence -> 0], [], []],
[token, 26, 33, sentence, [sentence -> 0], [], []]]

Thus, to use Spark NLP functionality such as Normalizer or Tokenizer, we have to transform our data to annotation format that Spark NLP understands. DocumentAssembler takes care of this. It creates the annotation from raw text data allowing other Spark NLP annotators to be used further on this data.

To use DocumentAssembler (and as you will see later any other Spark NLP annotator), you have to provide the input column for the transformation and the output column for the transformed data. These columns will be saved to a newly created PySpark data frame after you run your NLP pipeline.

from sparknlp.base import DocumentAssemblerdocumentAssembler = DocumentAssembler() \      
.setInputCol(text_col) \
.setOutputCol('document')

As you might have previously noticed, the annotation format saves information about the annotator type, which is different from annotator to annotator. DocumentAssembler creates the annotated data of a particular annotator type — DOCUMENT. The important thing is that annotators not only output the data of a specific type but they have to take in the data of allowed types. With DocumentAssembler, it is easy because the input data is simply the raw data we have. However, with other annotators, it is not that trivial.

As the first step of our NLP pipeline, we would like to tokenize our data — split sentences into words. For this, we will use Spark NLP Tokenizer that uses open standards for data tokenization. We will again set only input and output columns for this annotator, but Spark NLP documentation provides information about other functions available for each annotator.

from sparknlp.annotator import Tokenizertokenizer = Tokenizer() \
.setInputCols(['document']) \
.setOutputCol('tokenized')

Here, you see that we take the data that comes from DocumentAssembler and output it to a new column. In the documentation of Spark NLP for Tokenizer, you can see that this annotator accepts only input data of annotator type DOCUMENT and outputs data of a TOKEN type. Thus, you need to take care of the correct type that comes in and out of the annotators in your Spark NLP pipeline. Documentation is a good source of information about annotator types. This behaviour can sometimes restrict your freedom to choose the next annotator in your pipeline since the desired annotator may require a completely different annotator type than you have. This will make you search for a workaround, but most of the times, the data flow in Spark NLP will be as you expect it. Further, I will not mention the compatibility of annotator types for other used annotators but be aware of keeping that in mind.

Next, we move to the normalization step, here we clean the data and perform lowercasing. This step is done by Normalizer:

from sparknlp.annotator import Normalizernormalizer = Normalizer() \l
.setInputCols(['tokenized']) \
.setOutputCol('normalized') \
.setLowercase(True)

Now we are coming to lemmatization — bringing all words in the data to its lemma (base form). Lemmatizer annotator is responsible for performing lemmatization within the Spark NLP library. It allows for the usage of your own lemmatization dictionary, however, we will use a pre-trained model for lemmatization provided by LemmatizerModel:

from sparknlp.annotator import LemmatizerModellemmatizer = LemmatizerModel.pretrained() \
.setInputCols(['normalized']) \
.setOutputCol('lemmatized')

We also would like to filter stop words out since we are interested in meaningful words to describe our topics. That can be done by StopWordsCleaner, which removes a chosen set of words from the data. To supply the list of stop words for StopWordsCleaner, we import them from nltk package:

from nltk.corpus import stopwordseng_stopwords = stopwords.words('english')

Then, we provide this list to StopWordsCleaner:

from sparknlp.annotator import StopWordsCleanerstopwords_cleaner = StopWordsCleaner() \
.setInputCols(['lemmatized']) \
.setOutputCol('no_stop_lemmatized') \
.setStopWords(eng_stopwords)

We are done with basic preprocessing steps for topic modelling. However, I would like to introduce several additional steps that could benefit our task of topic modelling. In addition to words (unigrams), I propose to explore n-grams as well. Having n-grams for the topic modelling task could help a topic model to refine topics better, and we could easier understand extracted topics since n-grams give more context.

To incorporate n-grams into our NLP pipeline, we could use Spark NLP NGramGenerator that generates n-grams from tokens. However, we want to restrict n-grams to only meaningful ones like “musical instrument” (Adj + Noun) and not like “for musical” (Prep + Adj). To avoid irrelevant combinations of part-of-speech (POS) tags in n-grams, we use POSTagger to tag tokens in the data with POS tags. We will use one of POS tagging models that are available in Spark NLP.

from sparknlp.annotator import PerceptronModelpos_tagger = PerceptronModel.pretrained('pos_anc') \
.setInputCols(['document', 'lemmatized']) \
.setOutputCol('pos')

Filtering not meaningful n-grams out could later be done with Chunker that chunks data based on provided patterns of POS tags. We restrict possible n-grams to noun phrases but do not restrict n in n-grams (except for n=1 since we already have unigrams).

from sparknlp.annotator import Chunkerallowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']chunker = Chunker() \
.setInputCols(['document', 'pos']) \
.setOutputCol('ngrams') \
.setRegexParsers(allowed_tags)

Chunker has one problem for our task that it doesn’t output lemmatized text but rather raw text. Inflections expand the vocabulary a lot and make computation longer. Moreover, they distribute the weight of one n-gram to different variations of it making it less relevant to a topic in topic modelling. In this blog post, we will still use not lemmatized n-grams but if you are interested in how to proceed with lemmatized n-grams, you could refer to this Jupyter Notebook.

And here we are. This is our basic NLP pipeline that can be used for further topic modelling. We did all the essential steps that are beneficial for topic modelling. However, there are a lot of additional steps that could be done with Spark NLP to get more unambiguous results within your topics, for example, with spell checking, sentence splitting and dependency parsing which are available in Spark NLP.

To use the processed data for the topic modelling analysis, we need to transform it from the annotation format of Spark NLP to a “human-readable” format. For this, Spark NLP offers Finisher:

from sparknlp.base import Finisherfinisher = Finisher() \
.setInputCols(['unigrams', 'ngrams'])

Up to this point, we were defining the components of our NLP pipeline. Now we are ready to create the actual pipeline. For this, we will use the in-built PySpark functionality:

from pyspark.ml import Pipelinepipeline = Pipeline() \
.setStages([documentAssembler,
tokenizer,
normalizer,
lemmatizer,
stopwords_cleaner,
pos_tagger,
chunker,
finisher])

At this step, we should be careful about the order of NLP components and match their required input annotation types. After our pipeline is defined, we can fit all the estimators and transform the data for all transformers and estimated models:

processed_review = pipeline.fit(review_text).transform(review_text)

In our pipeline, we were processing unigrams and n-grams separately, and now it is time to combine them into one list of words for each review.

from pyspark.sql.functions import concatprocessed_review = processed_review.withColumn('final',
concat(F.col('finished_unigrams'),
F.col('finished_ngrams')))

The processed data that you will get will look as follows:

+-----------------------------------------------------------------+ |                                                            final| +-----------------------------------------------------------------+ |                                                           [like]| |[product, exactly, quite, affordablei, realize, double, screen...| |[primary, job, device, block, breath, would, otherwise, produc...| |[nice, windscreen, protect, mxl, mic, prevent, pop, thing, goo...| |[pop, filter, great, look, perform, like, studio, filter, your...| +-----------------------------------------------------------------+

3. Vectorization with PySpark

Before rushing into topic modelling itself, we need to convert the textual data into numeric one. Spark NLP doesn’t have functionality for topic modelling and non-contextual vectorization. Nevertheless, this functionality is implemented in PySpark.

For the topic modelling task, we will use TF-IDF to determine which words are important to which of our reviews. First, we will calculate TF (the frequency of each term in a document) with CountVectorizer. We derive the vocabulary of our data while fitting and get the counts at the transform step.

from pyspark.ml.feature import CountVectorizertfizer = CountVectorizer(inputCol='finished_no_stop_lemmatized',
outputCol='tf_features')
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

Then, we proceed with IDF (the inverse frequency of documents where a term occurred), which helps to account for words that are highly frequent in all reviews. This way, these words will not characterize a topic in the topic modelling step. We calculate TF-IDF based on TF results with IDF estimator of PySpark:

from pyspark.ml.feature import IDFidfizer = IDF(inputCol='tf_features', 
outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

The result of the vectorization step will look the following way:

+-----------------------------------------------------------------+ |                                                  tf_idf_features| +-----------------------------------------------------------------+ |                                  (72394,[9],[1.343442249258362])| |(72394,[0,3,4,9,10,12,20,40,46,52,56,62,92,132,134,147,151,174...| |(72394,[2,17,20,21,33,62,66,83,85,123,127,137,141,182,237,291,...| |(72394,[29,42,44,59,262,336,364,445,449,567,573,693,949,1221,1...| |(72394,[7,8,9,24,92,119,203,336,389,467,502,650,1093,1123,6770...| +-----------------------------------------------------------------+

4. Topic Modelling with PySpark

So, here comes the topic modelling. We will use, I guess, the most popular algorithm for topic modelling — LDA. It is a generative model that assumes that documents are represented by a distribution of topics and topics, in turn, are represented by a distribution of words. Given the TF-IDF scores for the vocabulary, LDA can identify the predefined number of topics within the data.

Taken from unsplash.

PySpark has the LDA algorithm implemented. To train LDA, we need to define the number of topics and the number of algorithm iterations.

from pyspark.ml.clustering import LDAnum_topics = 6
max_iter = 10
lda = LDA(k=num_topics,
maxIter=max_iter,
featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

After the topic model is trained, we would like to get the words that describe the derived topics. For this, we will write the UDF that converts word ids (the actual output for a topic by a topic model) into words:

vocab = tf_model.vocabularydef get_words(token_list):
return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

Now, we can output words for each modelled topic with LDA model function describeTopics. We will show only 7 the most relevant words for each topic.

num_top_words = 7topics = lda_model
.describeTopics(num_top_words)
.withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=100)

You might get the following results for the topics:

+-----+------------------------------------------------------------+ |topic|                                                  topicWords| +-----+------------------------------------------------------------+ |    0|      [wax, rack, spray, attack, pop, shoulder rest, guitar]| |    1|                [pick, guitar, string, one, use, tuner, pin]|
| 2| [string, pedal, guitar, sound, use, one, get]| | 3| [string, sound, pick, mic, nice, danelectro, echo]| | 4| [tube, delay, boss, fuzz, overdrive, pedal, tone]| | 5| [capo, stand, uke, ukulele, attach, arm, hanger]| +-----+------------------------------------------------------------+

As you can see, some topics are more generic and share words with other topics, but some of the topics are quite targetted. It is always nice to experiment with the number of topics for topic modelling since you never know what could be the best fit for your data. You may notice that n-grams didn’t contribute much to the topics. We have just one n-gram (“shoulder rest”) among top words for six topics. However, it is highly dependent on the data, and some domains benefit a lot from introducing n-grams to topic modelling.

That is it for topic modelling with PySpark and Spark NLP. I hope it was helpful! For more elaborated topic modelling pipeline with Spark in Python, check out the code in this repo.

Good luck with your experiments!

--

--

Maria Obedkova
TrustYou Engineering

NLP | ASR | AI | R&D | writing and illustrating as a hobby