Large data processing with Cloud Dataflow and Cloud Datastore

Learn how to process csv files large-scaled in parallel.

Sascha Heyer
Mar 4 · 5 min read
Dataflow in action

Large-scaled text annotations can be hard, that’s why I am currently building a Text Annotator which can be used in Machine Learning projects. This article is based on my learnings during the implemenation process. Follow me on Twitter @HeyerSascha if you don’t want to miss it.

Prerequisites

pip install apache-beam
pip install apache-beam[gcp]

How to start the Dataflow pipeline

Cloud Dataflow templates

class ProcessOptions(PipelineOptions):@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
dest='input',
type=str,
required=False,
help='Local file or a file in a Google Storage Bucket.')
parser.add_argument(...)

Build the pipeline

process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
(p
| 'Read from text' >> beam.io.ReadFromText(process_options.input, skip_header_lines=0)
| 'Process CSV' >>
beam.ParDo(ProcessCSV(),['text','label'])
| 'Build entities' >>
beam.ParDo(BuildEntities(), process_options.entity)
| 'Write entities into Datastore' >>
WriteToDatastore('annotator'))
p.run().wait_until_finish()

Create the Dataflow template

python2 -m process \     
--runner DataflowRunner \
--project io-annotator \
--staging_location gs://io-dataflow/staging \
--temp_location gs://io-dataflow/temp \
--template_location gs://io-dataflow/templates/process-csv \
--save_main_session True
INFO:root:A template was just created at location gs://io-dataflow/templates/process-csv

Metadata

{
"name": "Transform CSV",
"description": "Transforms a csv file and saves it into Cloud Datastore",
"parameters": [{
"name": "input",
"label": "Input Cloud Storage file",
"help_text": "The path to the csv file in Google Cloud Storage Bucket",
"regexes": ["^gs:\/\/[^\n\r]+$"],
"is_optional": false
},
{
"name": "entity",
"label": "Cloud Datastore entity name",
"help_text": "The Google Cloud Datastore entity name",
"is_optional": false
}]
}

Launch the Dataflow template

Dependencies

NameError: global name 'csv' is not defined
--save_main_session True

Cloud Datastore limitations

exclude_from_indexes=True

Wrapping up


Google Cloud Platform - Community

A collection of technical articles published or curated by Google Cloud Platform Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Thanks to manya g and Ray Tsang.

Sascha Heyer

Written by

Hi, I am Sascha, Software and Machine Learning Engineer 🚀 at TWT in Berlin and the Creator of https://defects.ai. Opinions are my own, not that of my company.

Google Cloud Platform - Community

A collection of technical articles published or curated by Google Cloud Platform Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.