How to kick off a Dataflow pipeline via Cloud Functions for Firebase

Ertan Dogrultan
Google Cloud - Community
2 min readSep 24, 2018

Our data pipelines at Soru depend on Google Dataflow. I chose it over Dataproc because the convenience of serverless architecture in our small team is very important. Apache Beam’s similarity with the internal tool I used a lot at Google called Flume also played a role in this choice.

Our Dataflow pipelines are written in Python. I usually kick off one shot jobs from my local computer until it’s ready to be integrated into our production pipeline. I will be writing about how to structure a Dataflow pipeline for various use cases in a different blog post.

The easiest way to invoke your Dataflow pipeline from another system programmatically is to

  • Create your custom Dataflow template. Here are the instructions.

Make sure to read the documentation carefully. There might be some restrictions in the SDK.

For example, in Python SDK, only file based IOs had ValueProvider support. When I templatized my pipeline, I had to change the input parameters from BigQuery to TextIO. See https://issues.apache.org/jira/browse/BEAM-1440

  • Deploy it to a GCS bucket. I wrote a simple script for deployment since we have test, staging, prod projects and we need to deploy our system to each one of them.
def deploy(project, bucket, template_name):
command = '''
python -m {template_name} \
--runner DataflowRunner \
--project {project} \
--requirements_file requirements.txt \
--staging_location gs://{bucket}/staging \
--temp_location gs://{bucket}/temp \
--template_location \
gs://{bucket}/templates/{template_name}
'''.format(project=project, bucket=bucket,
template_name=template_name)
os.system(command)
os.system('''
gsutil cp {template_name}_metadata \
gs://{bucket}/templates/
'''.format(bucket=bucket, template_name=template_name))

This script assumes you have <template>.py, <template>_metadata and requirements.txt file in the same directory.

  • Invoke the pipeline via REST API.

We have been relying more and more on serverless architectures recently. We are using Firebase Cloud Functions with Node.js 6 runtime, so we needed to kick of our Dataflow templates from there. Unfortunately, the client library support is a bit finicky. I must say it was not at all fun to make this work, so I would like to spare you the hassle. By the way, this code most likely works on Cloud Functions for GCP as well, although I haven’t tested it.

Here is how our code roughly looks like. We depend on “googleapis”: “33.0.0”.

const { google } = require('googleapis');
const dataflow = google.dataflow('v1b3');
const TEMPLATE_BUCKET = `your template bucket on GCS`;const kickOffDataflow = (input, output) => {
var jobName = `your job name`;
var tmpLocation = `gs://${TEMPLATE_BUCKET}/tmp`;
var templatePath = `gs://${TEMPLATE_BUCKET}/templates/` +
`your_template_name`;
var request = {
projectId: process.env.GCLOUD_PROJECT,
requestBody: {
jobName: jobName,
parameters: {
input: input,
output: output
},
environment: {
tempLocation: tmpLocation
}
},
gcsPath: templatePath
}
return
google.auth.getClient({
scopes: ['https://www.googleapis.com/auth/cloud-platform']
})
.then(auth => {
request.auth = auth;
return dataflow.projects.templates.launch(request);
})
.catch(error => {
console.error(error);
throw error;
});
}

Let me know if this helps and feel free to suggest improvements. Happy coding!

--

--