Distributed Topic Modelling using Spark NLP and Spark MLLib(LDA)

Satish Silveri
Jun 11, 2020 · 5 min read
Image Source: Britannica

Topic Modelling is one of the most common tasks in natural language processing. Extracting topic distribution from millions of documents can be useful in many ways e.g. identifying the reasons for complaints about a particular product or all products or a more classic example of identifying topics in news articles. We won’t delve into the details about what topic modeling is or how it works. There are so many good articles about it on the internet but I find this article from Analytics Vidhya comprehensive. So if you are not familiar with topic modeling or need to refresh your memory, go ahead and check it out.

The purpose of this blog is to get acquainted with the distributed approach for topic modeling. Imagine you have billions of documents sitting in your data lake (e.g. Hadoop) and you want to glean a better understanding of topics within them. Processing billions of documents in python comes with several computational limitations and bottlenecks. Luckily, Spark MLlib offers an optimized version of LDA that is specifically designed to work in a distributed environment. We will build a simple Topic Modeling pipeline using Spark NLP for pre-processing the data and Spark MLlib’s LDA to extract topics from the data.

We will be using news article data. You can download the data set from this link. Let’s go ahead and start writing some code.

Initialize Spark Session

# Import Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
spark = SparkSession.builder \
.appName("Spark NLP")\
.config("spark.driver.memory","8G")\ #change accordingly
.config("spark.driver.maxResultSize", "2G") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
.config("spark.kryoserializer.buffer.max", "1000M")\
.getOrCreate()

Read the Data

# if you are reading file from local storage
file_location = r'path\to\abcnews_date_txt.csv'
# if you are reading file from hdfs
file_location = r'hdfs:\\\user\path\to\abcnews_date_txt.csv'
file_type = "csv"# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
# Verify the count
df.count()

The data consists of 2 columns viz. publish_date and headline_text.

Pre-processing Pipeline using Spark NLP

# Spark NLP requires the input dataframe or column to be converted to document. 
document_assembler = DocumentAssembler() \
.setInputCol("headline_text") \
.setOutputCol("document") \
.setCleanupMode("shrink")
# Split sentence to tokens(array)
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")
# clean unwanted characters and garbage
normalizer = Normalizer() \
.setInputCols(["token"]) \
.setOutputCol("normalized")
# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
.setInputCols("normalized")\
.setOutputCol("cleanTokens")\
.setCaseSensitive(False)
# stem the words to bring them to the root form.
stemmer = Stemmer() \
.setInputCols(["cleanTokens"]) \
.setOutputCol("stem")
# Finisher is the most important annotator. Spark NLP adds its own structure when we convert each row in the dataframe to document. Finisher helps us to bring back the expected structure viz. array of tokens.finisher = Finisher() \
.setInputCols(["stem"]) \
.setOutputCols(["tokens"]) \
.setOutputAsArray(True) \
.setCleanAnnotations(False)
# We build a ml pipeline so that each phase can be executed in sequence. This pipeline can also be used to test the model.
nlp_pipeline = Pipeline(
stages=[document_assembler,
tokenizer,
normalizer,
stopwords_cleaner,
stemmer,
finisher])
# train the pipeline
nlp_model = nlp_pipeline.fit(df)
# apply the pipeline to transform dataframe.
processed_df = nlp_model.transform(df)
# nlp pipeline create intermediary columns that we dont need. So lets select the columns that we needtokens_df = processed_df.select('publish_date','tokens').limit(10000)tokens_df.show()

The output of the Spark NLP pipeline is a list of cleaned & stemmed tokens.

Feature Engineering

from pyspark.ml.feature import CountVectorizercv = CountVectorizer(inputCol="tokens", outputCol="features", vocabSize=500, minDF=3.0)# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

Build the LDA Model

from pyspark.ml.clustering import LDAnum_topics = 3lda = LDA(k=num_topics, maxIter=10)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

Visualize the topics

# extract vocabulary from CountVectorizer
vocab = cv_model.vocabulary
topics = model.describeTopics()
topics_rdd = topics.rdd
topics_words = topics_rdd\
.map(lambda row: row['termIndices'])\
.map(lambda idx_list: [vocab[idx] for idx in idx_list])\
.collect()
for idx, topic in enumerate(topics_words):
print("topic: {}".format(idx))
print("*"*25)
for word in topic:
print(word)
print("*"*25)

Following will be the output,

Conclusion

Thank you for reading! Like and leave a comment if you found it interesting and useful.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data…

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Satish Silveri

Written by

NLP Data Scientist@Novisto https://www.linkedin.com/in/satish-silveri-372710116/

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com