How I Should Have Orchestrated my ETL Pipeline Better with Cloud Dataflow Template

I recently had a discussion with a Google Developer Expert (GDE) during my GDE interview about how I usually orchestrate my Dataflow ETL Pipeline in either my personal projects or in my office project.

Me: “We have a Jenkins for running the test and create the executable fat jar of the dataflow pipeline, store it in a Google Cloud Storage (GCS) bucket and then sync it with a GCS bucket used by Cloud Composer (Google’s managed Apache Airflow)”
Me: “Then, the jar eventually will be executed by using an airflow bash operator. All of its argument are passed using airflow variable and value template like {{ ds }}.”
Him: “What will trigger the job?”
Me: “The airflow itself. We set up a crontab to start the DAG and all tasks within.”
Him: “Have you ever used Dataflow Template?”
Me: “No
Him: “Do you know what is it?”
Me: “No

So, after that question I told my self that I need to know about Dataflow Template before I have the next round of the GDE interview. But, only knowing it should be easy. So, in this post, I will also try to do small retrospective about what we did wrong and what we could do better in the coming future (see the last section if you want to directly jump).


Cloud Dataflow Template

If you want to know more about Cloud Dataflow Template, you can check this link.

The main idea of Dataflow Template is to separate between development and the execution workflow.

In Traditional workflow pipeline development and job execution all happen within a development environment. Typically developer develop their pipeline locally and the execute the pipeline from the development environment. Under the hood, Apache Beam SDK stages the files in GCS, create a job request and submit it to Cloud Dataflow service.

In the other hands, in Templated Dataflow jobs, staging and execution are separate steps. Developer typically develop the pipeline in the development environment and execute the pipeline and create a template. Once the template is created, non-developer users can easily execute the jobs by using one of these interfaces including GCP console, gcloud cli, or REST API.

What you need during development?

To create a template for your pipeline, you must modify your pipeline code to support runtime parameters:

Use ValueProvider for all pipeline options that you want to set or use at runtime.

Normally, if we want to have parameterise value for our pipeline, we immediately go for pipeline options like this:

public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
}

This technique will not work if we want to use Dataflow Template. So, what we need to do is using ValueProvider<String> instead of String as shown below:

public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
ValueProvider<String> getInputFile();
void setInputFile(ValueProvider<String> value);
}

There are multiple type of ValueProvider as discussed in here.

Call I/O methods that accept runtime parameters wherever you want to parameterize your pipeline.

Unfortunately, not all beam I/O operations support for dataflow ValueProvider. At least, these would partially work:

  • File-based IOs: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (requires SDK 2.3.0 or later)
  • PubSubIO
  • SpannerIO

Use DoFn objects that accept runtime parameters.

There is also slight modification to the DoFn. So, instead of accepting String passed by the options.getInputFile , what we need to do is passing the ValueProvider as the parameter for the DoFn constructor as shown below:

class MySumFn extends DoFn<Integer, Integer> {
ValueProvider<Integer> mySumInteger;

MySumFn(ValueProvider<Integer> sumInt) {
// Store the value provider
this.mySumInteger = sumInt;
}

@ProcessElement
public void processElement(ProcessContext c) {
// Get the value of the value provider and add it to
// the element's value.
c.output(c.element() + mySumInteger.get());
}
}

What you need during template creation?

Metadata

Every dataflow template must has its own metadata stored in GCS so that custom parameters are validated when the template executes. To create template metadata,

  1. Create a JSON file named <template-name>_metadata like below:
{
"name": "WordCount",
"description": "An example pipeline that counts words in the input file.",
"parameters": [{
"name": "inputFile",
"label": "Input Cloud Storage File(s)",
"help_text": "Path of the file pattern glob to read from.",
"regexes": ["^gs:\/\/[^\n\r]+$"],
"is_optional": true
},
{
"name": "output",
"label": "Output Cloud Storage File Prefix",
"help_text": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
"regexes": ["^gs:\/\/[^\n\r]+$"]
}]
}

2. Store it in GCS in the same folder as your template <template-name> folder.

Creating the Template

Creating the template should be very straightforward. We simply run this command to compile and stage the pipeline so that it becomes accessible from several execution interfaces.

Once the template is created, it then can be executed from any enviroments and with any parameterized value. More on that is discusseed in here.

Retrospectives

Packaging the job as jar is not a bad idea at all.

At first, the reason why we self package the job as the jar is because the available option at that time was DataflowJavaOperator. From its documentation it says that it accepts fat jar containing the pipeline code. So, we thought “why just dont go with this?” However, after we tried and got problem, we finally decided to go with `BashOperator` which are more prone to error because you really need to write the bash command or script on the DAG. Clearly, this operator will not be our choice anymore in the future to run the dataflow app.

Having a CI/CD for the ETL code repository is a good thing.

We already did it. But, instead of creating a fat jar in the end of CI pipeline, we can simply deploy the template-name and its template directly to GCS. If we care about the versioning of the pipeline, we can also name a folder with a it’s current versions.

For instance, the --templateLocation can be gs://dtflow-template/templates/<template-name>/v1.0.0 to denote that the current release version is v1.0.0

No need to synchronise Composer GCS and Jar GCS anymore.

By using the dataflow template, synchronising the fat jar between Composer’s GCS and CI/CD GCS becomes not necessary. Once we have the template, Composer/Airflow only need to know what the template name & location, the template version (if necessary) and the target environment is.

No More Bash Operator

The simplest form of job execution from Airflow that I can think of by now is by using SimpleHttpOperator. However, we need to aware that by doing so, there will be risk that we might expose the API Key to the log while executing the operator because we need to specify the API Key in every HTTP Call.

However, there is a better way by using DataflowTemplateOperator. This operators use an airflow connection which can be configured from the airflow admin panel. Thus, the risk of exposing the API Key is greatly reduced. For more about DataflowTemplateOperator , check this one out.

Airflow is good, but Composer is not really

We have been using Cloud Composer since the first time it was released. However, there are more problems that we need to face than what it can solve. Especially about it’s versioning and debugging technique. Other than that, also from the discussion we talked about Composer whose weird architecture under the hood (It uses two different GCP projects to deploy all infrastructures required for running the Kubernetes and App Engine). But, I’m not gonna talk about it in here. Might worth another separate discussion.

That’s all. I hope you also learned something as I did.

References

[1] Cloud Dataflow Documentation

[2] Airflow Integration. Dataflow Template Operator