Calling Google Cloud Machine Learning APIs from Batch and Stream ETL pipelines

Apache Beam 2.20 includes some handy PTransforms

Lak Lakshmanan
Google Cloud - Community
3 min readApr 28, 2020


Google Cloud AI has some very handy “building block” APIs for Natural Language Processing, Vision (e.g. OCR, image classification, logo identification, etc.), and Video Intelligence.

You will often to need call these APIs on a bunch of documents, images, or videos. Sometimes, this is on already collected data (“batch processing”) and sometimes, it is on streaming data. Invoking online APIs one-at-a-time from batch and stream pipelines requires quite a bit of care so that you don’t hit networking, throughput, or throttling limits.

Using Apache Beam to call the Cloud Natural Language API

Fortunately, Apache Beam 2.20 now provides a handy PTransform that does all the heavy lifting for you. To use it, first install Apache Beam:

pip install --upgrade --quiet apache-beam[gcp]
Get it? Image by Peter Lomas from Pixabay

Here’s a complete program that will run Apache Beam on three sentences locally:

import apache_beam as beam
from import naturallanguageml as nlp
def parse_nlp_result(response):
return [
response.sentences[0].text.content, # first sentence
[ for entity in response.entities], # all entities
[entity.metadata['wikipedia_url'] for entity in response.entities], # urls
features = nlp.types.AnnotateTextRequest.Features(
p = beam.Pipeline()
| beam.Create(['Has President Obama been to Paris?', 'Sophie loves walking along the Seine.', "C'est terrible"])
| beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
| nlp.AnnotateText(features)
| beam.Map(parse_nlp_result)
result =

These are the steps in the above pipeline:

  1. Create the in-memory collection
  2. Change each sentence to a NLP Document object
  3. Invoke the NLP API to annotate the Document. Here (look at the features being passed in), we are extracting entities and the document sentiment.
  4. Parse the output of the NLP API (see below)
  5. Write the output to a text file

The output of the NLP API looks like this:

sentences {
text {
content: "I love walking along the Seine."
sentiment {
magnitude: 0.699999988079071
score: 0.699999988079071
entities {
name: "Seine"
metadata {
key: "mid"
value: "/m/0f3vz"
metadata {
key: "wikipedia_url"
value: ""
salience: 1.0
mentions {
text {
content: "Seine"
begin_offset: 25
type: PROPER
document_sentiment {
magnitude: 0.699999988079071
score: 0.699999988079071
language: "en"

That’s why I’m able to extract the pieces I want as response.sentences[0].text.content and response.document_sentiment.score. Here’s what the output of the pipeline looks like:

['Has President Obama been to Paris?', ['Obama', 'Paris'], ['', ''], 'en', 0.0]
["C'est terrible", [], [], 'fr', -0.8999999761581421]
['Sophie loves walking along the Seine.', ['Sophie', 'Seine'], ['', ''], 'en', 0.800000011920929]

Changing input to BigQuery and running on the Cloud

In the above snippet, I ran the pipeline on an in-memory set of sentences and used the DirectRunner, which runs locally. Let’s change the input to BigQuery and run it in Cloud Dataflow:

The pipeline now runs on Dataflow off the hackernews comments table in BigQuery:

The result looks like this:

How easy was that, to get the sentiment of a boatload of comments?


Next steps:



Lak Lakshmanan
Google Cloud - Community

articles are personal observations and not investment advice.