Migrating to Airflow 2.0 and building an integration to MLflow in Google Cloud Platform (GCP)

Yixie Shao
gft-engineering
Published in
10 min readApr 27, 2021

Moving to Airflow 2.0

The GFT Cambridge AI team is required to build data pipelines in order to create the environments that we utilise to deliver and use models. In production projects GCP offers Cloud Composer which is a managed implementation of Airflow — but when working on ‘side projects’ and demos we need to use a lighter weight alternative; therefore we built our development system for experimental projects using Airflow 1.x.

Apache Airflow 2.0 was released in December 2020. At GFT, we strive to reduce any technical debt, and in our Cambridge AI team we have a no legacy code policy in our projects. So, when Airflow 2.0 came out, the team decided to move to the latest version.

When the team reviewed the proposal to move to Airflow 2.0, we realised that there was another long-standing challenge that we could get to at the same time. MLflow is an open-source platform to manage the machine learning lifecycle, including experimentation, reproducibility, deployment and a central model registry, allowing us to create meaningful identities for models that can then be used in testing and deployment descriptors.

MLflow is increasingly being used by the team as a metadata store linking all of the lineage information about our models — and the more we’ve done this the more the team is investing in it. Up to now, the links between our data pipelines and MLflow have been relatively ad-hoc and undeveloped. We were planning on building or adopting feature stores into our workflows when the technology matured, but the centrality that data pipelines have assumed in most of our project work means that they are de-facto becoming the definition of our features — as well as the descriptors of how data is being ferried about in the project. Given that we have to do a general upgrade and migration, why not build out a better interface between the thing that’s managing the production of the models training and test data and the thing that’s recording the properties of the models?

This blog post outlines the process that we used to set up the new Airflow 2.0 system and integrate it with MLflow.

Our Set-up

1. There are quite a lot of new features released in Airflow 2.0. According to the release on the official website, there are the following key updates:

· A new way of writing dags: the TaskFlow API (AIP-31). Dependencies are handled more clearly and XCom is nicer to use

· A fully specified REST API (AIP-32) is provided as a fully supported production ready API

· Massive Scheduler performance improvements: it is now HA compatible (AIP-15). This is super useful for both resiliency (in case a scheduler goes down) and scheduling performance.

· Task Groups replaces subDAG, enabling the system to execute multiple tasks in parallel.

· Refreshed UI (looks pretty).

· Security — all operations now require authorisation which is obviously really important for a security by design mindset — even if the system is locked away in a secured zone, it’s really important that getting access to it doesn’t allow any unauthorised actor to manipulate sensitive resources.

· Smart Sensors for reduced load from task sensors (AIP-17)

· Airflow core and providers: Splitting Airflow into 60+ packages providing a much shallower and contained learning curve for users.

2. PostgreSQL is installed in the same machine as the backend database, which supports run multi-dags at the same time. Besides, it allows the use of more than a single scheduler instance, which is a new feature in Airflow 2.0.

3. MLflow is installed and integrated into Airflow. We use MySQL in the form of CloudSQL as the backend database for MLflow experiments tracking and GCS buckets to store models and artifacts. The machine is connected to CloudSQL through a system service unit using cloudsqlproxy. With this setup we can simply connect to MLflow server by providing the tracking URL and artifact locations.

The migration

We use a ubuntu-18.04 image for this example.

The first step is to provide permission to access BigQuery, CloudSQL and Cloud Storage.

Step 1: create sudo USER: airflow. In terminal:

Step 2: install dependencies, apache airflow 2.0 and MLflow.

After this step, Airflow 2.0 and MLflow should be installed. We could test the installation by checking the version of the packages:

Step 3: install & configure PostgreSQL.

This should have now created and configured the database for airflow to use.

Step 4: Start and configure Airflow, under the USER: airflow

Now, airflow is configured and initialized and we can start Airflow scheduler and webserver to access airflow UI.

Figure 1 Start Airflow webserver and scheduler In the console

Figure 1 shows the screenshots once we start the airflow webserver and scheduler in the terminal. Now we could access the Airflow web UI.

We use systemd units for easy management and so that we have a relatively robust system.

The Airflow scheduler and webserver both are running as system services. Once the service is running, it doesn’t require any configuration from users after a machine is restarted. Everything comes back automatically — which is lucky as we use Cloud Scheduler to close the service down every night at 9pm to keep costs down.

The way systemd unit files are written and used to configure service running will be described in step 7 & step 8.

Step 5: Configure the environment variables for airflow. Write a file, ‘airflow’, then copy it into /etc/. For instance, it could contain the following information:

Just replace <project> with your GCP project ID.

Step 6: Install gcsfuse to mount a GCS bucket as file system.

Now we need to provide somewhere for Airflow to keep the actual artefacts — not just the metadata it uses to run them.

GCS buckets are the provide method for unstructured block style storage in GCP. In our set-up, GCS is used to store airflow DAGs and logs, and when we get MLflow implemented, this is where it’ll store artefacts too.

Step 7: Write systemd unit files.

As mentioned above, in our set-up, the airflow scheduler and webserver will be run as a systemd unit. In addition, we would like a service unit to mount a gcs bucket as a file system automatically.

We will need a .service file to launch systemd service.

Below there is an example of ‘gcs-airflow.service’ file for mounting GCS bucket as a file system in VM. Note: change the <GCS-bucket-name> to your bucket name, and <local_folder_path> to your local path in VM.

Step 8: Configure systemd services. Once the .service file is written, we need to put it into /usr/lib/systemd/system/ folder, then load and start the service.

Figure 2: Status of gcs-airflow service

Figure 2 shows the status of the gcs-airflow service. We could find this service is successfully running.

Security

We use MySQL in cloudSQL as the backend database for MLflow. Hence, the VM needs to connect to the cloudSQL instance. Although we are using this set up for our internal experimentation and side projects, it’s really important to us and GFT that we secure our services properly. We access the CloudSQL instance using the Cloud SQL Proxy, which offers the following advantages:

  • Secure connections: The proxy automatically encrypts traffic to and from the database using TLS 1.2 with a 128-bit AES cipher; SSL certificates are used to verify client and server identities.
  • Easier connection management: The proxy handles authentication with Cloud SQL, removing the need to provide static IP addresses.

Step 9: Install Cloud SQL Proxy.

After the cloud_sql_proxy is installed in the VM, we would like to connect VM to cloudSQL using cloud_sql_proxy as systemd unit service. We need to write the systemd unit file like gcs_mount.service, and repeat Step8 to configure systemd service.

Once the service is running, we could check the connection by:

mysql -u <USER> — host=127.0.0.1 — port=<PORT> -p

Figure 3 : check the connection to cloudSQL

Figure 3 shows that we have already connected to the CloudSQL using the cloudSQL proxy. Hence we could connect to the CloudSQL through the host : 127.0.0.1.

Note: <USER> is user of MySQL database created in CloudSQL. <PORT> is the connection port defined in .service unit file, in this example, it is 3309.

In some cases, we expect that the machine will be behind a firewall within a secured zone and accessible only from a whitelisted VM using a virtual desktop, or from a whitelisted laptop — but in this case we are not working on sensitive or personal data so this isn’t required.

Step 10: using MLflow in Airflow DAGs and run MLflow webserver as systemd units. Here is a test DAG which I used to check the connection to MLflow database.

Troubleshooting:

During our set-up, when we test the MLflow integration in Airflow, we encountered the permission problem in creating the folder ‘mlruns’. To solve this problem, we create the folder in advance, and change the ownership and group to ‘airflow’ for this folder.

sudo mkdir /mlruns/

sudo chown -R airflow:airflow /mlruns/

Access to the infrastructure is securely provided through airflow and MLflow web UIs using local port forwarding to ssh into VM. After configuring the service account and credentials for GCP project, we could simply use gcloud command line tool:

gcloud compute ssh — project=PROJECT_ID — zone=ZONE \

VM_NAME — -L <local-port>:localhost:<remote-port>

  • PROJECT_ID: the ID of the project that contains the instance
  • VM_NAME: the name of the instance
  • ZONE: the name of the zone in which the instance is located
  • <local-port>: the local port you’re listening on
  • <remote-port>: the remote port you’re connecting to

Once the local machine is connected to the VM, we could simply open a browser and type: http://localhost:<local-port> to access web UI. So, in the whole process, no external IP is needed.

Figure 4: Access airflow web UI through localhost in browser

Figure 4 shows that we could access the airflow web UI through localhost after we ssh into the VM using the local port forwarding.

Infrastructure Automation

We often need to start new projects on separate infrastructures, but we want to have pretty well the same set-up each time — with some configuration, and we don’t want the possibility of inconsistency or errors in the builds. So, in order to re-build the system quickly without hassle, we have what we call our own infrastructure “activator”. An activator contains re-usable, custom Terraform modules for different GCP components which we use to build our infrastructure within a secured GCP landing zone provided by the GFT sponsored “Tranquility Base” open source project. In this case, the activator we add the Airflow and MLflow automation to is our Machine Learning Playground system. By using this set-up, we can automate the whole process of setting up an airflow VM with the features mentioned in the previous section. And we can use the scripts and different config-files (systemd unit files) to choose the features we want to have in the set-up, and embed it as one component in the on-demand infrastructure for a particular project.

Usage example — insurance demo

We built an insurance demo based on this set-up. During the build out and development work for the project, Airflow 2.0 was used to manage the whole ML workflow and MLflow to track pipeline metadata, model training parameters, metrics and register models.

Figure 5: screenshot of Airflow Webserver

The above screenshot shows an ETL pipeline for our insurance demo in Airflow 2.0. In this pipeline, we do the data enrichment and transformation. MLflow is used to track the pipeline metadata. The task metadata will be sent to the MLflow server.

Once the task is run, the parameters, metrics will be stored in the MLflow server, and the artefacts in the linked GCS bucket.

Figure 6: MLflow Web UI

We can now check them through the MLflow web UI, as shown in Figure 6. We can find the source/user is ‘airflow’. This experiment has been done by airflow pipeline. And the status is ‘FINISHED’. In this experiment, we record the airflow task name, operator, execution time, task status, related project, source/destination tables as the parameters, table metadata, such as size, created time, modified time and row counts into the metric and the workflow inside this task to the MLflow artefact folder, which is a GCS bucket. We use the tags to provide information on the functionality of different pipelines enabling us to search for models that have been created in particular pipelines and to determine the pipelines that are associated with any particular model.

Conclusion

In this post we discussed the set-up of Airflow 2.0 and its integration with MLflow to provide stronger end-to-end tracking and governance in complex machine learning applications. There’s a long way to go in both our use of this technology and its extension and integration with other components of our process and practice — but we have found lots of positives from this set-up so far and we see very positive prospects for further development in the future.

--

--

Yixie Shao
gft-engineering

Data Enigeer in Cambridge Lab, GFT. She has worked in different industries, including aerospace, satellite system, geographic information system and IoT