Spark NLP Walkthrough, powered by TensorFlow

What is Spark NLP?

What are we going to do?

  • Large PubMed based word embeddings corpus, tried at various dimension sizes, provides good enough coverage for production use cases. Meaning 99.7% and more of the tokens have a specific vector equivalent (Metric based on our own experiments). Note: We are not allowed to re-distribute these PubMed word embeddings explicitly, but you may be able to find them on the internet.
  • TensorFlow based model that combines CNN and LSTM layers for sub-word analysis. This is part of Spark NLP library, and it includes graphs required to both train and predict.
  • Spark framework, including a cluster-based scenario, distributed work environment, and storage.
  • Spark NLP annotators dealing with several different stages of NLP.

But, what is Spark?

Knowledge prerequisites

  • Apache Spark 2.x
  • TensorFlow

Getting started, basic component introductions

Working together: Spark-NLP and TensorFlow

  1. Rule-based annotators, which apply standard strategies utilizing Regular Expressions and simple math and statistics (such as a Tokenizer, or a dictionary-based Sentiment Analysis detector).
  2. Machine learning models, that include their own implemented statistics and math, within their training stage (called Annotator Approach) and their trained model counterpart to SparkML Transformers (called Annotator Models by Spark-NLP).
  3. A third family is of our interest here: The Deep Learning models. These include a pre-generated graph object, and a full-API on Spark-NLP that allows training them right away by just providing some corpora and parameters. Once trained, the annotator model will automatically link the prediction stage to TensorFlow, feeding its session with the data its layers need. Such an annotator will act the same way any other annotator works on Spark-NLP.

Word embeddings

Corpus and OCR processing

The scenario: Searching for healthcare problems in text and returning database code

Imports and core concepts

import com.johnsnowlabs.nlp.base._import com.johnsnowlabs.nlp.annotator._import com.johnsnowlabs.nlp.annotators.ner.NerConverterimport org.apache.spark.ml.Pipeline
import com.johnsnowlabs.nlp.util.io.OcrHelperval clinicalRecords = OcrHelper.createDataset(spark, "/home/data/clinical_records/", "text", "text_metadata")clinicalRecords.show()

The workhorses: Spark NLP annotators

val document = new DocumentAssembler().setInputCol("text").setOutputCol("document")val sentenceDetector = new SentenceDetector().setInputCols(Array("document")).setOutputCol("sentences")val token = new Tokenizer().setInputCols(Array("sentences")).setOutputCol("token")val normalizer = new Normalizer().setInputCols("token").setOutputCol("normal")val ner = NerDLModel.load("/home/models/i2b2_normalized_109").setInputCols("normal", "document").setOutputCol("ner")val nerConverter = new NerConverter().setInputCols("document", "normal", "ner").setOutputCol("ner_converter")
  1. DocumentAssembler is the annotator taking the target text column, making it ready for further NLP annotations, into a column called document.
  2. SentenceDetector will identify sentence boundaries in our paragraphs. Since we are reading entire file contents on each row, we want to make sure our sentences are properly bounded. This annotators correctly handles page breaks, paragraph breaks, lists, enumerations, and other common formatting features that distort the regular flow of the text. This help the accuracy of the rest of the pipeline. The output column for this is the sentence column.
  3. Tokenizer will separate each meaningful word, within each sentence. This is very important on NLP since it will identify a token boundary.
  4. Normalizer will clean up each token, taking as input column token out from our Tokenizer, and putting normalized tokens in the normal column. Cleaning up includes removing any non-character strings. Whether this helps or not on a model, is a decision of the model maker.
  5. NerDLModel is our pipeline hero. This annotator will take as inputs the normalized tokens, plus the original strings (since every annotator has a specific number of requirements, we are providing). In this scenario, we are loading a healthcare model from disk (Which is licensed by John Snow Labs Spark NLP for healthcare). In the next section, we’ll be explaining how this was generated
  6. NerConverter is an auxiliary annotator specifically made for the NER models of Spark NLP. This one will take the entity tags and build up the chunk of text which represents the entity found, relative to the original sentence.

Putting things together: Spark Pipelines

val pipeline = new Pipeline().setStages(Array(    document,    sentenceDetector,    token,    normalizer,    ner,    nerConverter))val model = pipeline.fit(Seq.empty[String].toDF("text"))

Sidebar: But, where did NerDLModel come from originally?

documentAssembler = DocumentAssembler()\
.setInputCol("text")\
.setOutputCol("document")

sentenceDetector = SentenceDetector()\
.setInputCol(["document"])\
.setOutputCol("sentence")

tokenizer = Tokenizer()\
.setInputCol(["document"])\
.setOutputCol("token")

nerTagger = NerDLApproach()\
.setInputCols(["sentence", "token"])\
.setLabelColumn("label")\
.setOutputCol("ner")\
.setMaxEpochs(109)\
.setExternalDataset("/home/data/clinical.train")\
.setValidationDataset("/home/clinical.validation")\
.setTestDataset("/home/clinical.testing")\
.setEmbeddingsSource("/clinical.embeddings.100d.txt", 100, 2)\
.setRandomSeed(0)\
.setIncludeEmbeddings(True)\
.setVerbose(2)

converter = NerConverter()\
.setInputCols(["document", "token", "ner"])\
.setOutputCol("ner_span")

finisher = Finisher() \
.setInputCols(["ner_span"])\
.setIncludeKeys(True)

pipeline = Pipeline(
stages = [
documentAssembler,
sentenceDetector,
tokenizer,
nerTagger,
converter,
finisher
]
)
val model = pipeline.fit(Seq.empty[String].toDF("text"))model.getStages(3).write.overwrite.save(“/home/models/i2b2_normalized_109”)

The TensorFlow Model

with tf.device('/gpu:0'):with tf.variable_scope("char_repr") as scope:# shape = (batch size, sentence, word)self.char_ids = tf.placeholder(tf.int32, shape=[None, None, None], name="char_ids")# shape = (batch_size, sentence)self.word_lengths = tf.placeholder(tf.int32, shape=[None, None], name="word_lengths")with tf.variable_scope("word_repr") as scope:# shape = (batch size)self.sentence_lengths = tf.placeholder(tf.int32, shape=[None], name="sentence_lengths")with tf.variable_scope("training", reuse=None) as scope:# shape = (batch, sentence)self.labels = tf.placeholder(tf.int32, shape=[None, None], name="labels")self.lr = tf.placeholder_with_default(0.005,  shape=(), name="lr")self.dropout = tf.placeholder(tf.float32, shape=(), name="dropout")
# this will create a graph as per requirements of the modelner = NerModel()# now we’re adding nodes to our TF graphner.add_cnn_char_repr(101, 25, 30)ner.add_pretrained_word_embeddings(30)ner.add_context_repr(10, 200)ner.add_inference_layer(False)ner.add_training_op(5.0)ner.init_variables()
def save_models(folder):saveNodes = list([n.name for n in tf.get_default_graph().as_graph_def().nodeif n.name.startswith('save/')])if len(saveNodes) == 0:saver = tf.train.Saver()variables_file = os.path.join(folder, 'variables')self.ner.session.run('save/control_dependency', feed_dict={'save/Const:0': variables_file})tf.train.write_graph(self.ner.session.graph, folder, 'saved_model.pb', False)
import com.johnsnowlabs.nlp.annotators.ner.dl.NerDLModelPythonReaderimport com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormatval model = NerDLModelPythonReader.read(    "/home/model/tensorflow/python_tf_model",    ResourceHelper.spark,    normalize = true,    WordEmbeddingsFormat.BINARY)model.write.overwrite().save(“/home/models/i2b2_normalized_109”)

Moving forward: Back to our Pipeline

val finisher = new Finisher().setInputCols("normal", "ner").setOutputCols("finished_normal", "finished_ner")val finished_result = finisher.transform(result.select("normal", "ner"))finished_result.take(1).foreach(println(_))

Running small predictions and samples with LightPipelines

val lightPipeline = new LightPipeline(model)val annotations = lightPipeline.annotate("The findings of many authors show that reduced fetal growth is followed by increased mortality from cardiovascular disease in adult life. They are further evidence that cardiovascular disease originates, among other risk factors, through programming of the bodies structure and metabolism during fetal and early post-natal life. Wrong maternal nutrition may have an important influence on programming.")annotations("normal").zip(annotations("ner")).foreach(println(_))

Conclusion

--

--

--

Saif Addin Ellafi is a programmer, analyst, data scientist, and forever a student while being an extreme sports and gaming enthusiast. Developer at JohnSnowLabs

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Saif Addin Ellafi

Saif Addin Ellafi

Saif Addin Ellafi is a programmer, analyst, data scientist, and forever a student while being an extreme sports and gaming enthusiast. Developer at JohnSnowLabs

More from Medium

Conversation with Myself

Person is a ready-to-use chatbot for bloggers and public figures

Positive Reflection @ Amal Fellowahip

Why you should not force your kids to study