Integrating Keycloak with Apache Airflow

A Step-by-Step Guide to Implementing Authentication and Authorization in Apache Airflow Using Keycloak

Athibet Prawane
odds.team
10 min readSep 5, 2024

--

☑ Tested with Keycloak version 25.0 and Airflow version 2.10.

As more organizations use Apache Airflow to manage complex workflows, it’s crucial to secure access to this powerful tool. One effective way to do this is by integrating Airflow with Keycloak, an open-source identity and access management solution.

In this blog post, I’ll show you how to set up authentication in Airflow using Keycloak. We’ll walk through the setup process, starting with Keycloak and ending with configuring Airflow to use it for user authentication.

Why Use Keycloak with Airflow?

Keycloak offers advanced features such as Single Sign-On (SSO), user federation, and fine-grained access control. By using Keycloak with Airflow, you can…

  • Centralize user authentication across multiple applications.
  • Implement SSO so users can access Airflow with their existing credentials.
  • Use stronger authentication methods like two-factor authentication.
  • Manage user roles and permissions more efficiently.

Prerequisites

Before you begin, ensure you have Docker installed on your machine.

Setting Up Keycloak

1. Install Keycloak

Run the following command to start Keycloak using Docker.

docker run -d --name keycloak \
-p 9090:8080 \
-e KEYCLOAK_ADMIN=keycloak \
-e KEYCLOAK_ADMIN_PASSWORD=keycloak \
quay.io/keycloak/keycloak:25.0.4 \
start-dev

This command will start Keycloak on port 9090 with keycloak as the username and password for admin.

2. Create a Realm

A realm is where you manage users, roles, and clients.

  • Go to the Keycloak admin console at http://localhost:9090.
  • Log in with your admin credentials.
  • Click “Keycloak” realm, then click “Create Realm” ➊.
  • Enter a name for your realm, such as airflow-realm ➋, and click “Create” ➌.

The master realm named Keycloak should only be used for managing other realms.

3. Create a Client

A client in Keycloak represents an application like Airflow.

  • In the airflow-realm, go to the “Clients” section and click “Create client”.
  • Choose OpenID Connect as the Client type.
  • Set the Client ID to airflow-client ➋ and click “Next” ➌.
  • Toggle on Client authentication ➍ and click “Next” ➎.
  • Set the Root URL to your Airflow base URL to http://localhost:8080 ➏.
  • Set Valid redirect URIs to /oauth-authorized/keycloak ➐ and click “Save” ➑.

4. Copy the Client Secret

  • In the airflow-client settings, go to the “Credentials” ➊ tab.
  • Copy ➋ the Client Secret value. (you’ll need this for Airflow configuration)

5. Define Roles

Define roles within airflow-client by creating sample roles like Admin and Viewer.

  • Go to the “Roles” tab in the airflow-client settings.
  • Click on “Create role” and proceed to create the Admin and Viewer roles ➊.

Using client-specific roles instead of realm roles allows for more granular control, ensuring that roles are scoped to this client only. However, this approach may vary depending on whether roles need to be shared across multiple clients or isolated within a single client.

6. Set Audience

To ensure the Keycloak access token is correctly targeted for Airflow, set the audience (aud) field to airflow-client.

  • Go to the “Client scopes” ➊ tab in the airflow-client settings.
  • Select “airflow-client-dedicated” ➋, which contains the specific mappers and scope for this client.
  • Click “Configure a new mapper” ➌.
  • Choose “Audience” ➍ as the Mapper type.
  • Name the mapper (e.g., airflow-audience ➎) and select airflow-client ➏ as the Included Client Audience.
  • Click “Save” ➐ to apply the mapper.

7. Create Users

Add users to the airflow-realm and assign them the appropriate roles.

  • Go to the “Users” ➊ section of the realm and click “Add User”.
  • Fill in the user’s General ➋ information, including Username, Email, First Name, and Last Name, then click “Create” ➌.
  • In the “Credentials” ➍ tab, click “Set password” ➎, enter the password, toggle off the “Temporary” ➏ option, and click “Save” ➐.
  • Go to the “Role Mapping” ➑ tab, click “Assign role” ➒, and assign the appropriate roles to the user.

Repeat this process for any additional users, ensuring both Admin and Viewer roles are represented in the realm ➓.

Configuring Airflow

With Keycloak set up, the next step is to configure Airflow to authenticate users through Keycloak. By default, Airflow uses a database for user authentication, requiring users to enter a password, but you can customize it to integrate with external authentication providers such as Keycloak. Starting with Airflow 2.0, the UI is based on Flask AppBuilder, which supports various authentication methods including OAuth and OpenID.

1. Create a Custom Webserver Config

In order to enable Keycloak as the authentication provider for Airflow, you’ll need to create a custom webserver_config.py file. This configuration file will allow Airflow to integrate with Keycloak for OAuth-based authentication.

# This configuration file is based on the official Apache Airflow webserver configuration.
# Original file: https://github.com/apache/airflow/blob/main/airflow/config_templates/default_webserver_config.py
#
# Customizations have been made to integrate Keycloak for authentication.
# If Keycloak is disabled, the configuration falls back to database authentication.
#
# For more information on the default configuration, refer to the official Airflow documentation.

import logging
import os
from base64 import b64decode

import jwt
import requests
from airflow.www.security import AirflowSecurityManager
from cryptography.hazmat.primitives import serialization
from flask_appbuilder import expose
from flask_appbuilder.security.manager import AUTH_DB, AUTH_OAUTH
from flask_appbuilder.security.views import AuthOAuthView

basedir = os.path.abspath(os.path.dirname(__file__))

# Flask-WTF flag for CSRF
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = None

# Initialize logging
log = logging.getLogger(__name__)

# Check if Keycloak is enabled
KEYCLOAK_ENABLED = os.getenv("KEYCLOAK_ENABLED", "False").lower() in [
"true",
"1",
"yes",
]


if KEYCLOAK_ENABLED:
# Keycloak Authentication Configuration
AUTH_TYPE = AUTH_OAUTH
AUTH_ROLES_SYNC_AT_LOGIN = True # Synchronize roles at each login
AUTH_USER_REGISTRATION = True # Allow automatic user registration
AUTH_DEFAULT_ROLE = os.getenv("AUTH_DEFAULT_ROLE", "Public")
AUTH_USER_REGISTRATION_ROLE = AUTH_DEFAULT_ROLE # Default role for new users

# Keycloak Settings
KEYCLOAK_ISSUER = os.environ["KEYCLOAK_ISSUER"].rstrip("/")
KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "airflow")
KEYCLOAK_CLIENT_SECRET = os.environ["KEYCLOAK_CLIENT_SECRET"]

# Keycloak URLs
KEYCLOAK_SERVER_METADATA_URL = f"{KEYCLOAK_ISSUER}/.well-known/openid-configuration"
KEYCLOAK_API_BASE_URL = f"{KEYCLOAK_ISSUER}/protocol/openid-connect"
KEYCLOAK_ACCESS_TOKEN_URL = f"{KEYCLOAK_API_BASE_URL}/token"
KEYCLOAK_AUTHORIZE_URL = f"{KEYCLOAK_API_BASE_URL}/auth"

# Role Mapping Configuration
AUTH_ROLES_MAPPING = {
os.getenv("KEYCLOAK_ADMIN_ROLE", "Admin"): ["Admin"],
os.getenv("KEYCLOAK_OP_ROLE", "Op"): ["Op"],
os.getenv("KEYCLOAK_USER_ROLE", "User"): ["User"],
os.getenv("KEYCLOAK_VIEWER_ROLE", "Viewer"): ["Viewer"],
os.getenv("KEYCLOAK_PUBLIC_ROLE", "Public"): ["Public"],
}

# OAuth Providers Configuration
OAUTH_PROVIDERS = [
{
"name": "keycloak",
"icon": "fa-key",
"token_key": "access_token",
"remote_app": {
"client_id": KEYCLOAK_CLIENT_ID,
"client_secret": KEYCLOAK_CLIENT_SECRET,
"server_metadata_url": KEYCLOAK_SERVER_METADATA_URL,
"api_base_url": KEYCLOAK_API_BASE_URL,
"client_kwargs": {"scope": "email profile"},
"access_token_url": KEYCLOAK_ACCESS_TOKEN_URL,
"authorize_url": KEYCLOAK_AUTHORIZE_URL,
"request_token_url": None,
},
},
]

# Fetch Keycloak Public Key
try:
response = requests.get(KEYCLOAK_ISSUER)
response.raise_for_status()
key_der_base64 = response.json().get("public_key")
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)
except requests.RequestException as e:
log.error(f"Failed to fetch Keycloak public key: {e}")
raise

# Custom Authentication View to Handle Keycloak Logout
class CustomAuthRemoteUserView(AuthOAuthView):
@expose("/logout/", methods=["GET", "POST"])
def logout(self):
return super().logout()

# Custom Security Manager for Keycloak Authentication
class CustomSecurityManager(AirflowSecurityManager):
authoauthview = CustomAuthRemoteUserView

def oauth_user_info(self, provider, response):
if provider == "keycloak":
token = response.get("access_token")
if not token:
log.warning("No access token found in the response.")
return {}

try:
user_info = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=KEYCLOAK_CLIENT_ID,
)
except jwt.ExpiredSignatureError:
log.error("Token has expired.")
return {}
except jwt.InvalidTokenError as e:
log.error(f"Invalid token: {e}")
return {}

roles = (
user_info.get("resource_access", {})
.get(KEYCLOAK_CLIENT_ID, {})
.get("roles", [AUTH_DEFAULT_ROLE])
)

return {
"username": user_info.get("preferred_username"),
"email": user_info.get("email"),
"first_name": user_info.get("given_name"),
"last_name": user_info.get("family_name"),
"role_keys": roles,
}

return {}

# Assign Custom Security Manager to Airflow
SECURITY_MANAGER_CLASS = CustomSecurityManager

else:
# Fallback to Database Authentication if Keycloak is not enabled
AUTH_TYPE = AUTH_DB
log.info("Keycloak is disabled. Using database authentication (AUTH_DB).")
  • For scenarios like local development, you can disable Keycloak authentication by setting the KEYCLOAK_ENABLED environment variable to false. This will revert the authentication method to the default database-based authentication.
  • If specific roles need to be mapped from Keycloak to Airflow, you can modify the role mappings by adjusting the KEYCLOAK_XXX_ROLE environment variables to suit your needs.
Airflow default roles

2. Start Airflow with Keycloak Configuration

Run Airflow with Docker, passing in the custom webserver configuration file and necessary environment variables for Keycloak.

docker run -d --name airflow-webserver \
--expose 8080 \
-e KEYCLOAK_ENABLED=true \
-e KEYCLOAK_ISSUER=http://localhost:9090/realms/airflow-realm \
-e KEYCLOAK_CLIENT_ID=airflow-client \
-e KEYCLOAK_CLIENT_SECRET=your-keycloak-client-secret \
-v ./webserver_config.py:/opt/airflow/webserver_config.py \
--network=host \
apache/airflow:2.10.0 \
bash -c "airflow db init && airflow webserver"
  • This command runs just the Airflow webserver for testing purposes.
  • Using the --network=host option allows Docker to directly access services running on localhost like Keycloak, so there's no need to map ports with -p. Simply exposing the port --expose 8080 is enough.
  • Environment variables KEYCLOAK_ENABLED, KEYCLOAK_ISSUER, KEYCLOAK_CLIENT_ID, and KEYCLOAK_CLIENT_SECRET provide Keycloak configuration for Airflow. Be sure to replace your-keycloak-client-secret with the actual client secret from your Keycloak setup.
  • The -v option mounts the custom webserver_config.py file into the container.

To monitor the Airflow process and troubleshoot any errors, use this command.

docker logs -f airflow-webserver

Testing the Integration

1. Access Airflow

Open a browser and navigate to http://localhost:8080. You should be redirected to a login page with an option to sign in using Keycloak.

2. Log In with Keycloak

Use the credentials of a Keycloak user, such as an Admin or Viewer.

3. Verify User Profile

After logging in, review the user profile ➊ in Airflow. Admin users should see the Admin role, and Viewer users should see the Viewer role.

Conclusion

Integrating Keycloak with Airflow provides centralized user management and enhanced security. This setup simplifies role management and streamlines user access across applications.

Extra

Accessing the API Using Keycloak Access Token

API authentication is managed separately from web-based authentication. By default, the system checks the user session for API access. To access the API using a Keycloak access token, a custom user authentication backend must be implemented.

1. Create a file named user_auth.py that contains the custom authentication logic for Keycloak token validation.

# This configuration file is based on the official Apache Airflow basic authentication backend.
# Original file: https://github.com/apache/airflow/blob/main/airflow/providers/fab/auth_manager/api/auth/backend/basic_auth.py

from __future__ import annotations

import logging
import os
from base64 import b64decode
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, Any, TypeVar, cast

import jwt
import requests
from airflow.providers.fab.auth_manager.security_manager.override import (
FabAirflowSecurityManagerOverride,
)
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.www.extensions.init_auth_manager import get_auth_manager
from cryptography.hazmat.primitives import serialization
from flask import Response, current_app, request
from flask_appbuilder.const import AUTH_LDAP, AUTH_OAUTH
from flask_login import login_user

if TYPE_CHECKING:
from airflow.providers.fab.auth_manager.models import User

CLIENT_AUTH: tuple[str, str] | Any | None = None

T = TypeVar("T", bound=Callable)

log = logging.getLogger(__name__)


def init_app(_):
"""Initialize authentication backend."""


def auth_current_user() -> User | None:
"""Authenticate and set current user if Authorization header exists."""

ab_security_manager = get_airflow_app().appbuilder.sm
user = None
if ab_security_manager.auth_type == AUTH_OAUTH:
if not request.headers["Authorization"]:
return None

# Keycloak Settings
AUTH_DEFAULT_ROLE = os.getenv("AUTH_DEFAULT_ROLE", "Public")
KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "airflow")
KEYCLOAK_ISSUER = os.environ["KEYCLOAK_ISSUER"].rstrip("/")

# Fetch Keycloak Public Key
try:
response = requests.get(KEYCLOAK_ISSUER)
response.raise_for_status()
key_der_base64 = response.json().get("public_key")
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)
except requests.RequestException as e:
log.error(f"Failed to fetch Keycloak public key: {e}")
raise

token = str.replace(str(request.headers["Authorization"]), "Bearer ", "")
try:
user_info = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=KEYCLOAK_CLIENT_ID,
)
except jwt.ExpiredSignatureError:
log.error("Token has expired.")
return {}
except jwt.InvalidTokenError as e:
log.error(f"Invalid token: {e}")
return {}

roles = (
user_info.get("resource_access", {})
.get(KEYCLOAK_CLIENT_ID, {})
.get("roles", [AUTH_DEFAULT_ROLE])
)

userinfo = {
"username": user_info.get("preferred_username"),
"email": user_info.get("email"),
"first_name": user_info.get("given_name"),
"last_name": user_info.get("family_name"),
"role_keys": roles,
}
user = ab_security_manager.auth_user_oauth(userinfo)
else:
auth = request.authorization
if auth is None or not auth.username or not auth.password:
return None

security_manager = cast(
FabAirflowSecurityManagerOverride, get_auth_manager().security_manager
)
user = None
if security_manager.auth_type == AUTH_LDAP:
user = security_manager.auth_user_ldap(auth.username, auth.password)
if user is None:
user = security_manager.auth_user_db(auth.username, auth.password)
if user is not None:
login_user(user, remember=False)

if user is not None:
login_user(user, remember=False)

return user


def requires_authentication(function: T):
"""Decorate functions that require authentication."""

@wraps(function)
def decorated(*args, **kwargs):
if auth_current_user() is not None or current_app.config.get(
"AUTH_ROLE_PUBLIC", None
):
return function(*args, **kwargs)
else:
return Response("Unauthorized", 401, {"WWW-Authenticate": "Basic"})

return cast(T, decorated)

2. Mount the file to the container and set the AIRFLOW__API__AUTH_BACKENDS environment variable to use the custom authentication backend.

docker run -d --name airflow-webserver \
--expose 8080 \
-e AIRFLOW__API__AUTH_BACKENDS=user_auth \
-e KEYCLOAK_ENABLED=true \
-e KEYCLOAK_ISSUER=http://localhost:9090/realms/airflow-realm \
-e KEYCLOAK_CLIENT_ID=airflow-client \
-e KEYCLOAK_CLIENT_SECRET=your-keycloak-client-secret \
-v ./webserver_config.py:/opt/airflow/webserver_config.py \
-v ./user_auth.py:/opt/airflow/user_auth.py \
--network=host \
apache/airflow:2.10.0 \
bash -c "airflow db init && airflow webserver"

3. Test the configuration by generating an access token using Keycloak and retrieving the list of DAGs to confirm that the custom authentication is functioning correctly.

export TOKEN=$(curl "http://localhost:9090/realms/airflow-realm/protocol/openid-connect/token" \
-d "client_id=airflow-client" \
-d "client_secret=your-keycloak-client-secret" \
-d "username=admin" \
-d "password=admin" \
-d "grant_type=password" \
| jq -r ".access_token")
curl http://localhost:8080/api/v1/dags -H "Authorization: Bearer $TOKEN"

Remember to replace your-keycloak-client-secret with the actual client secret from your Keycloak configuration.

Customizing the Airflow Image

To incorporate both the custom webserver_config.py and, if needed, the custom user_auth.py for API authentication into your Airflow image, use the following Dockerfile as a guide.

FROM apache/airflow:2.10.0

COPY webserver_config.py /opt/airflow/webserver_config.py
# COPY user_auth.py /opt/airflow/user_auth.py # Uncomment this line if you're using custom API authentication.

Read more…

--

--