Distributed Topic Modelling using Spark NLP and Spark MLLib(LDA)

Satish Silveri
Analytics Vidhya
Published in
5 min readJun 11, 2020
Image Source: Britannica

Topic Modelling is one of the most common tasks in natural language processing. Extracting topic distribution from millions of documents can be useful in many ways e.g. identifying the reasons for complaints about a particular product or all products or a more classic example of identifying topics in news articles. We won’t delve into the details about what topic modeling is or how it works. There are so many good articles about it on the internet but I find this article from Analytics Vidhya comprehensive. So if you are not familiar with topic modeling or need to refresh your memory, go ahead and check it out.

The purpose of this blog is to get acquainted with the distributed approach for topic modeling. Imagine you have billions of documents sitting in your data lake (e.g. Hadoop) and you want to glean a better understanding of topics within them. Processing billions of documents in python comes with several computational limitations and bottlenecks. Luckily, Spark MLlib offers an optimized version of LDA that is specifically designed to work in a distributed environment. We will build a simple Topic Modeling pipeline using Spark NLP for pre-processing the data and Spark MLlib’s LDA to extract topics from the data.

We will be using news article data. You can download the data set from this link. Let’s go ahead and start writing some code.

Initialize Spark Session

First, we will import all the required packages and initialize the spark session. You can also pass the configurations while invoking the spark application using spark-submit.

# Import Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
spark = SparkSession.builder \
.appName("Spark NLP")\
.config("spark.driver.memory","8G")\ #change accordingly
.config("spark.driver.maxResultSize", "2G") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
.config("spark.kryoserializer.buffer.max", "1000M")\
.getOrCreate()

Read the Data

We can read the data from 3 sources viz. local, HDFS, and S3. If you have your data stored on S3, I recommend transferring that data to HDFS using distributed copy using S3DistCp and then loading the data from HDFS. This approach reduces the Network IO that is otherwise required for reading all the data from S3, distributing it to all the worker nodes, and loading the data into the memory.

# if you are reading file from local storage
file_location = r'path\to\abcnews_date_txt.csv'
# if you are reading file from hdfs
file_location = r'hdfs:\\\user\path\to\abcnews_date_txt.csv'
file_type = "csv"# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
# Verify the count
df.count()

The data consists of 2 columns viz. publish_date and headline_text.

Pre-processing Pipeline using Spark NLP

# Spark NLP requires the input dataframe or column to be converted to document. 
document_assembler = DocumentAssembler() \
.setInputCol("headline_text") \
.setOutputCol("document") \
.setCleanupMode("shrink")
# Split sentence to tokens(array)
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")
# clean unwanted characters and garbage
normalizer = Normalizer() \
.setInputCols(["token"]) \
.setOutputCol("normalized")
# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
.setInputCols("normalized")\
.setOutputCol("cleanTokens")\
.setCaseSensitive(False)
# stem the words to bring them to the root form.
stemmer = Stemmer() \
.setInputCols(["cleanTokens"]) \
.setOutputCol("stem")
# Finisher is the most important annotator. Spark NLP adds its own structure when we convert each row in the dataframe to document. Finisher helps us to bring back the expected structure viz. array of tokens.finisher = Finisher() \
.setInputCols(["stem"]) \
.setOutputCols(["tokens"]) \
.setOutputAsArray(True) \
.setCleanAnnotations(False)
# We build a ml pipeline so that each phase can be executed in sequence. This pipeline can also be used to test the model.
nlp_pipeline = Pipeline(
stages=[document_assembler,
tokenizer,
normalizer,
stopwords_cleaner,
stemmer,
finisher])
# train the pipeline
nlp_model = nlp_pipeline.fit(df)
# apply the pipeline to transform dataframe.
processed_df = nlp_model.transform(df)
# nlp pipeline create intermediary columns that we dont need. So lets select the columns that we needtokens_df = processed_df.select('publish_date','tokens').limit(10000)tokens_df.show()

The output of the Spark NLP pipeline is a list of cleaned & stemmed tokens.

Feature Engineering

We will use Spark MLlib’s CountVectorizer to generate features from textual data. Latent Dirichlet Allocation requires a data-specific vocabulary to perform topic modeling.

from pyspark.ml.feature import CountVectorizercv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

Build the LDA Model

LDA model requires a minimum of 2 hyperparameters viz. k (number of topics) and maxIter (number of iterations). Try different values of k and maxIter to see which combination best suits your data.

from pyspark.ml.clustering import LDAnum_topics = 3lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

Visualize the topics

After completing the training, we can view the words that represent each topic using the following code;

# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()
topics_rdd = topics.rdd
topics_words = topics_rdd\
.map(lambda row: row['termIndices'])\
.map(lambda idx_list: [vocab[idx] for idx in idx_list])\
.collect()
for idx, topic in enumerate(topics_words):
print("topic: {}".format(idx))
print("*"*25)
for word in topic:
print(word)
print("*"*25)

Following will be the output,

Conclusion

From a sole perspective of machine learning capabilities, Spark MLlib is not as rich compared to the massive set of machine learning libraries available directly through Python. Where Spark MLlib adds value is, by offering distributed computing capability for basic ML tasks over huge datasets. With significant improvements in every release, Apache Spark is slowly bridging the gap between Big Data and Machine Learning.

Thank you for reading! Like and leave a comment if you found it interesting and useful.

--

--