Calling Google Cloud Machine Learning APIs from Batch and Stream ETL pipelines

Apache Beam 2.20 includes some handy PTransforms

Google Cloud AI has some very handy “building block” APIs for Natural Language Processing, Vision (e.g. OCR, image classification, logo identification, etc.), and Video Intelligence.

You will often to need call these APIs on a bunch of documents, images, or videos. Sometimes, this is on already collected data (“batch processing”) and sometimes, it is on streaming data. Invoking online APIs one-at-a-time from batch and stream pipelines requires quite a bit of care so that you don’t hit networking, throughput, or throttling limits.

Using Apache Beam to call the Cloud Natural Language API

Fortunately, Apache Beam 2.20 now provides a handy PTransform that does all the heavy lifting for you. To use it, first install Apache Beam:

pip install --upgrade --quiet apache-beam[gcp]
Get it? Image by Peter Lomas from Pixabay

Here’s a complete program that will run Apache Beam on three sentences locally:

import apache_beam as beam
from apache_beam.ml.gcp import naturallanguageml as nlp
def parse_nlp_result(response):
return [
response.sentences[0].text.content, # first sentence
[entity.name for entity in response.entities], # all entities
[entity.metadata['wikipedia_url'] for entity in response.entities], # urls
response.language,
response.document_sentiment.score
]
features = nlp.types.AnnotateTextRequest.Features(
extract_entities=True,
extract_document_sentiment=True,
extract_syntax=False
)
p = beam.Pipeline()
(p
| beam.Create(['Has President Obama been to Paris?', 'Sophie loves walking along the Seine.', "C'est terrible"])
| beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
| nlp.AnnotateText(features)
| beam.Map(parse_nlp_result)
| beam.io.WriteToText('output.txt')
)
result = p.run()
result.wait_until_finish()

These are the steps in the above pipeline:

  1. Create the in-memory collection
  2. Change each sentence to a NLP Document object
  3. Invoke the NLP API to annotate the Document. Here (look at the features being passed in), we are extracting entities and the document sentiment.
  4. Parse the output of the NLP API (see below)
  5. Write the output to a text file

The output of the NLP API looks like this:

sentences {
text {
content: "I love walking along the Seine."
}
sentiment {
magnitude: 0.699999988079071
score: 0.699999988079071
}
}
entities {
name: "Seine"
type: LOCATION
metadata {
key: "mid"
value: "/m/0f3vz"
}
metadata {
key: "wikipedia_url"
value: "https://en.wikipedia.org/wiki/Seine"
}
salience: 1.0
mentions {
text {
content: "Seine"
begin_offset: 25
}
type: PROPER
}
}
document_sentiment {
magnitude: 0.699999988079071
score: 0.699999988079071
}
language: "en"

That’s why I’m able to extract the pieces I want as response.sentences[0].text.content and response.document_sentiment.score. Here’s what the output of the pipeline looks like:

['Has President Obama been to Paris?', ['Obama', 'Paris'], ['https://en.wikipedia.org/wiki/Barack_Obama', 'https://en.wikipedia.org/wiki/Paris'], 'en', 0.0]
["C'est terrible", [], [], 'fr', -0.8999999761581421]
['Sophie loves walking along the Seine.', ['Sophie', 'Seine'], ['', 'https://en.wikipedia.org/wiki/Seine'], 'en', 0.800000011920929]

Changing input to BigQuery and running on the Cloud

In the above snippet, I ran the pipeline on an in-memory set of sentences and used the DirectRunner, which runs locally. Let’s change the input to BigQuery and run it in Cloud Dataflow:

https://gist.github.com/lakshmanok/a07d488a0b8006c26bdee0a7effd6245

The pipeline now runs on Dataflow off the hackernews comments table in BigQuery:

The result looks like this:

How easy was that, to get the sentiment of a boatload of comments?

Enjoy!

Next steps:

--

--

--

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Recommended from Medium

How I trained my first AI

Understanding Random Forest

Auto Differentiation with TensorFlow

Supervised Learning

MSE is Cross Entropy at heart: Maximum Likelihood Estimation Explained

Text Detection in Spark OCR

Democratize Artificial Intelligence with Automated Machine Learning

SpaceNet 6: Exploring Foundational Mapping at Scale

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Lak Lakshmanan

Lak Lakshmanan

Operating Executive at a technology investment firm; articles are personal observations and not investment advice.

More from Medium

Deployment Topologies for Data Fusion with Shared VPCs

Vertex AI Pipelines vs. Cloud Composer for Orchestration

Streaming Data to BigQuery with Dataflow and Updating the Schema in Real-Time

What is new with Google Cloud and C++