Calling Google Cloud Machine Learning APIs from Batch and Stream ETL pipelines
Google Cloud AI has some very handy “building block” APIs for Natural Language Processing, Vision (e.g. OCR, image classification, logo identification, etc.), and Video Intelligence.
You will often to need call these APIs on a bunch of documents, images, or videos. Sometimes, this is on already collected data (“batch processing”) and sometimes, it is on streaming data. Invoking online APIs one-at-a-time from batch and stream pipelines requires quite a bit of care so that you don’t hit networking, throughput, or throttling limits.
Using Apache Beam to call the Cloud Natural Language API
Fortunately, Apache Beam 2.20 now provides a handy PTransform that does all the heavy lifting for you. To use it, first install Apache Beam:
pip install --upgrade --quiet apache-beam[gcp]
Here’s a complete program that will run Apache Beam on three sentences locally:
import apache_beam as beam
from apache_beam.ml.gcp import naturallanguageml as nlpdef parse_nlp_result(response):
response.sentences.text.content, # first sentence
[entity.name for entity in response.entities], # all entities
[entity.metadata['wikipedia_url'] for entity in response.entities], # urls
]features = nlp.types.AnnotateTextRequest.Features(
)p = beam.Pipeline()
| beam.Create(['Has President Obama been to Paris?', 'Sophie loves walking along the Seine.', "C'est terrible"])
| beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
result = p.run()
These are the steps in the above pipeline:
- Create the in-memory collection
- Change each sentence to a NLP Document object
- Invoke the NLP API to annotate the Document. Here (look at the features being passed in), we are extracting entities and the document sentiment.
- Parse the output of the NLP API (see below)
- Write the output to a text file
The output of the NLP API looks like this:
content: "I love walking along the Seine."
That’s why I’m able to extract the pieces I want as response.sentences.text.content and response.document_sentiment.score. Here’s what the output of the pipeline looks like:
['Has President Obama been to Paris?', ['Obama', 'Paris'], ['https://en.wikipedia.org/wiki/Barack_Obama', 'https://en.wikipedia.org/wiki/Paris'], 'en', 0.0]
["C'est terrible", , , 'fr', -0.8999999761581421]
['Sophie loves walking along the Seine.', ['Sophie', 'Seine'], ['', 'https://en.wikipedia.org/wiki/Seine'], 'en', 0.800000011920929]
Changing input to BigQuery and running on the Cloud
In the above snippet, I ran the pipeline on an in-memory set of sentences and used the DirectRunner, which runs locally. Let’s change the input to BigQuery and run it in Cloud Dataflow:
The pipeline now runs on Dataflow off the hackernews comments table in BigQuery:
The result looks like this:
How easy was that, to get the sentiment of a boatload of comments?