Composer, Dataflow and Private IP addresses
A public IP address is one that is addressable from the Internet. Within a GCP environment, we can declare that Compute Engines should never be assigned public IP addresses. This can be defined at the Organization level and becomes an organization policy. Policies are policed across all the Projects belonging to that Organization including Compute Engines that may be created by other GCP products as part of their own execution. The reason why we may wish to disable public IP addresses is that this will reduce the attack surface. If a Compute Engine in GCP has no need to be accessed from the Internet then it would be normal to define the Compute Engine to simply not ask for one. However, if an administrative error were made, then the Compute Engine may be given a public IP address by default. The Organization level policy ensures that no Compute Engine may have a public IP address … either by default, by accident or by attempting to ask for one.
Let us now consider the Dataflow product. This executes data transformation pipelines distributed across multiple workers. By default, these workers interoperate with each other using public IP addresses. When we submit a job to Dataflow, we can pass a parameter that declares that we are to use private IP addresses only. We can see this parameter in the gcloud
command used to submit Dataflow jobs:
gcloud dataflow jobs run … --disable-public-ips …
The --disable-public-ips flag is used to instruct Dataflow to use private IPs only.
While looking at the gcloud
command is interesting, most invocations of Dataflow happen from calling APIs. The gcloud
command is simply a command line tool that is pre-built for invoking those APIs. If we look in depth at the APIs to run work in Dataflow, that will take us to the REST launch request which has a parameter called environment
that defines the environment in which the Dataflow job should run. Inside the environment
parameter, there is a parameter called ipConfiguration
which can take a value of either WORKER_IP_PUBLIC
or WORKER_IP_PRIVATE
.
To explain it in simpler terms … if we are running Dataflow in an environment that prevents public IP addresses then requests to run jobs in Dataflow must pass a parameter of:
ipConfiguration=WORKER_IP_PRIVATE
So far, we have told a generic story about invoking Dataflow, now we turn our attention to Composer (Airflow). Within Composer we can invoke Dataflow jobs from the DAG. When we have a step in the DAG that invokes Dataflow, the DAG authors can supply parameters that are passed through to Dataflow for interpretation. This would imply that we want to set and pass through the ipConfiguration
property that was just described. Ideally we would want something that looks as follows:
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"dataflow_default_options": {
"project": project_id,
# Set to your region
"region": gce_region,
# Set to your zone
"zone": gce_zone,
# This is a subfolder for storing temporary files, like the staged pipeline job.
"temp_location": bucket_path + "/tmp/",
# Use private IP addresses
"ipConfiguration": "WORKER_IP_PRIVATE"
}
}
Unfortunately we have a problem. While Dataflow describes all the possible parameters that can be set, the Airflow (and hence Composer) product has a defect. Only a subset of the possible parameters that are honored by Dataflow are actually defined as valid in an Airflow DAG configuration and ipConfiguration
is not one of them. Even though we can set the property in the DAG, it is silently ignored at runtime.
This problems was tracked as a Github issue against the Airflow project as:
#8300: No way to create a Dataflow job with Private IP configuration
Fortunately, the issue was closed as resolved. However, as you may have guessed, we still have a problem. The resolution and resulting code fix was made against the Master branch of the Airflow project meaning that it will be assured to be present in v2.x of Airflow (we are currently (2021–01) at 1.x). Composer uses Airflow 1.x and hence the resolution that is found in the Airflow code base only applies to some future release.
Fortunately, there is a workaround that is available that we tested and it appears to work. To understand the workaround, you should have a basic appreciation of a concept called Monkey Patching.
Imagine you are using a Python package called SomePackage
. Now imagine you are invoking a function called someFunction
in that package. You might code:
SomePackage.someFunction()
Now imagine that when you call someFunction, it fails or doesn’t do exactly what you want. If you have access to the source code of the Python package, you may look inside it and find that there is some internal function called _someInternalFunction
that contains a bug that is preventing you from achieving your goal. You want to patch _someInternalFunction
but you are just a consumer of the Python package and you don’t know how (nor want to) rebuild the whole package with your changes (even if that was somehow possible).
Instead, what you can do is perform a Monkey Patch where you override the function you want to change. Your new code now becomes:
def _myNewInternalFunction:
# New code hereSomePackage._someInternalFunction = _myNewInternalFunction
SomePackage.someFunction()
The effect being that when you now invoke someFunction()
, the internals of that function will invoke _someInternalFunction()
but instead of calling the original Package supplied code, the code that you just supplied as a new implementation of _someInternalFunction()
will be called. You have effectively patched the package. This is an example of the technique called Monkey Patching.
With this background in mind, there is a Monkey Patch that can be used in connection with Airflow (Composer) that will result in the honoring of the ipConfiguration
parameter.
In the Python DAG, include the following code early on:
# Required for the monkey patch
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _DataflowJob# We redefine the function that handles the environment keys
# that are used to build the RuntimeEnvironment, to include 'ipConfiguration'
def _start_template_dataflow(self, name, variables, parameters,
dataflow_template):
# Builds RuntimeEnvironment from variables dictionary
# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
environment = {}
for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
'tempLocation', 'bypassTempDirValidation', 'machineType',
'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels',
'ipConfiguration']:
if key in variables:
environment.update({key: variables[key]})
body = {"jobName": name,
"parameters": parameters,
"environment": environment}
service = self.get_conn()
request = service.projects().locations().templates().launch(
projectId=variables['project'],
location=variables['region'],
gcsPath=dataflow_template,
body=body
)
response = request.execute(num_retries=self.num_retries)
variables = self._set_variables(variables)
_DataflowJob(self.get_conn(), variables['project'], name, variables['region'],
self.poll_sleep, num_retries=self.num_retries).wait_for_done()
return response# Monkey patching
DataFlowHook._start_template_dataflow = _start_template_dataflow
While it is possible to read this code and comprehend what it does, our recommendation is to treat it like a black box and try and ignore it in your DAG source file after pasting it in. Ideally, at some future date, the code changes already made in the Airflow master Github branch will be available in Composer and the Monkey Patch can be removed.
A video illustrating this story and walking through tests is available:
Notes:
- This article did not describe how to define Organization level policies nor highlight the specific policy used to disable public IP addresses. It is assumed that the GCP administrator can consult documentation on VPC network configurations and GCP Organization policies. It should also be noted that the VPC networks that Composer and Dataflow are configured to use have the Private Google Access parameter enabled. The default is disabled.
See also: