Integrating Keycloak with Apache Airflow
A Step-by-Step Guide to Implementing Authentication and Authorization in Apache Airflow Using Keycloak
☑ Tested with Keycloak version 25.0 and Airflow version 2.10.
As more organizations use Apache Airflow to manage complex workflows, it’s crucial to secure access to this powerful tool. One effective way to do this is by integrating Airflow with Keycloak, an open-source identity and access management solution.
In this blog post, I’ll show you how to set up authentication in Airflow using Keycloak. We’ll walk through the setup process, starting with Keycloak and ending with configuring Airflow to use it for user authentication.
Why Use Keycloak with Airflow?
Keycloak offers advanced features such as Single Sign-On (SSO), user federation, and fine-grained access control. By using Keycloak with Airflow, you can…
- Centralize user authentication across multiple applications.
- Implement SSO so users can access Airflow with their existing credentials.
- Use stronger authentication methods like two-factor authentication.
- Manage user roles and permissions more efficiently.
Prerequisites
Before you begin, ensure you have Docker installed on your machine.
Setting Up Keycloak
1. Install Keycloak
Run the following command to start Keycloak using Docker.
docker run -d --name keycloak \
-p 9090:8080 \
-e KEYCLOAK_ADMIN=keycloak \
-e KEYCLOAK_ADMIN_PASSWORD=keycloak \
quay.io/keycloak/keycloak:25.0.4 \
start-dev
This command will start Keycloak on port 9090 with keycloak
as the username and password for admin.
2. Create a Realm
A realm is where you manage users, roles, and clients.
- Go to the Keycloak admin console at http://localhost:9090.
- Log in with your admin credentials.
- Click “Keycloak” realm, then click “Create Realm” ➊.
- Enter a name for your realm, such as
airflow-realm
➋, and click “Create” ➌.
The master realm named Keycloak should only be used for managing other realms.
3. Create a Client
A client in Keycloak represents an application like Airflow.
- In the airflow-realm, go to the “Clients” section and click “Create client”.
- Choose
OpenID Connect
➊ as the Client type. - Set the Client ID to
airflow-client
➋ and click “Next” ➌. - Toggle on Client authentication ➍ and click “Next” ➎.
- Set the Root URL to your Airflow base URL to
http://localhost:8080
➏. - Set Valid redirect URIs to
/oauth-authorized/keycloak
➐ and click “Save” ➑.
4. Copy the Client Secret
- In the airflow-client settings, go to the “Credentials” ➊ tab.
- Copy ➋ the Client Secret value. (you’ll need this for Airflow configuration)
5. Define Roles
Define roles within airflow-client by creating sample roles like Admin and Viewer.
- Go to the “Roles” tab in the airflow-client settings.
- Click on “Create role” and proceed to create the
Admin
andViewer
roles ➊.
Using client-specific roles instead of realm roles allows for more granular control, ensuring that roles are scoped to this client only. However, this approach may vary depending on whether roles need to be shared across multiple clients or isolated within a single client.
6. Set Audience
To ensure the Keycloak access token is correctly targeted for Airflow, set the audience (aud
) field to airflow-client.
- Go to the “Client scopes” ➊ tab in the airflow-client settings.
- Select “airflow-client-dedicated” ➋, which contains the specific mappers and scope for this client.
- Click “Configure a new mapper” ➌.
- Choose “Audience” ➍ as the Mapper type.
- Name the mapper (e.g.,
airflow-audience
➎) and selectairflow-client
➏ as the Included Client Audience. - Click “Save” ➐ to apply the mapper.
7. Create Users
Add users to the airflow-realm and assign them the appropriate roles.
- Go to the “Users” ➊ section of the realm and click “Add User”.
- Fill in the user’s General ➋ information, including Username, Email, First Name, and Last Name, then click “Create” ➌.
- In the “Credentials” ➍ tab, click “Set password” ➎, enter the password, toggle off the “Temporary” ➏ option, and click “Save” ➐.
- Go to the “Role Mapping” ➑ tab, click “Assign role” ➒, and assign the appropriate roles to the user.
Repeat this process for any additional users, ensuring both Admin and Viewer roles are represented in the realm ➓.
Configuring Airflow
With Keycloak set up, the next step is to configure Airflow to authenticate users through Keycloak. By default, Airflow uses a database for user authentication, requiring users to enter a password, but you can customize it to integrate with external authentication providers such as Keycloak. Starting with Airflow 2.0, the UI is based on Flask AppBuilder, which supports various authentication methods including OAuth and OpenID.
1. Create a Custom Webserver Config
In order to enable Keycloak as the authentication provider for Airflow, you’ll need to create a custom webserver_config.py
file. This configuration file will allow Airflow to integrate with Keycloak for OAuth-based authentication.
# This configuration file is based on the official Apache Airflow webserver configuration.
# Original file: https://github.com/apache/airflow/blob/main/airflow/config_templates/default_webserver_config.py
#
# Customizations have been made to integrate Keycloak for authentication.
# If Keycloak is disabled, the configuration falls back to database authentication.
#
# For more information on the default configuration, refer to the official Airflow documentation.
import logging
import os
from base64 import b64decode
import jwt
import requests
from airflow.www.security import AirflowSecurityManager
from cryptography.hazmat.primitives import serialization
from flask_appbuilder import expose
from flask_appbuilder.security.manager import AUTH_DB, AUTH_OAUTH
from flask_appbuilder.security.views import AuthOAuthView
basedir = os.path.abspath(os.path.dirname(__file__))
# Flask-WTF flag for CSRF
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = None
# Initialize logging
log = logging.getLogger(__name__)
# Check if Keycloak is enabled
KEYCLOAK_ENABLED = os.getenv("KEYCLOAK_ENABLED", "False").lower() in [
"true",
"1",
"yes",
]
if KEYCLOAK_ENABLED:
# Keycloak Authentication Configuration
AUTH_TYPE = AUTH_OAUTH
AUTH_ROLES_SYNC_AT_LOGIN = True # Synchronize roles at each login
AUTH_USER_REGISTRATION = True # Allow automatic user registration
AUTH_DEFAULT_ROLE = os.getenv("AUTH_DEFAULT_ROLE", "Public")
AUTH_USER_REGISTRATION_ROLE = AUTH_DEFAULT_ROLE # Default role for new users
# Keycloak Settings
KEYCLOAK_ISSUER = os.environ["KEYCLOAK_ISSUER"].rstrip("/")
KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "airflow")
KEYCLOAK_CLIENT_SECRET = os.environ["KEYCLOAK_CLIENT_SECRET"]
# Keycloak URLs
KEYCLOAK_SERVER_METADATA_URL = f"{KEYCLOAK_ISSUER}/.well-known/openid-configuration"
KEYCLOAK_API_BASE_URL = f"{KEYCLOAK_ISSUER}/protocol/openid-connect"
KEYCLOAK_ACCESS_TOKEN_URL = f"{KEYCLOAK_API_BASE_URL}/token"
KEYCLOAK_AUTHORIZE_URL = f"{KEYCLOAK_API_BASE_URL}/auth"
# Role Mapping Configuration
AUTH_ROLES_MAPPING = {
os.getenv("KEYCLOAK_ADMIN_ROLE", "Admin"): ["Admin"],
os.getenv("KEYCLOAK_OP_ROLE", "Op"): ["Op"],
os.getenv("KEYCLOAK_USER_ROLE", "User"): ["User"],
os.getenv("KEYCLOAK_VIEWER_ROLE", "Viewer"): ["Viewer"],
os.getenv("KEYCLOAK_PUBLIC_ROLE", "Public"): ["Public"],
}
# OAuth Providers Configuration
OAUTH_PROVIDERS = [
{
"name": "keycloak",
"icon": "fa-key",
"token_key": "access_token",
"remote_app": {
"client_id": KEYCLOAK_CLIENT_ID,
"client_secret": KEYCLOAK_CLIENT_SECRET,
"server_metadata_url": KEYCLOAK_SERVER_METADATA_URL,
"api_base_url": KEYCLOAK_API_BASE_URL,
"client_kwargs": {"scope": "email profile"},
"access_token_url": KEYCLOAK_ACCESS_TOKEN_URL,
"authorize_url": KEYCLOAK_AUTHORIZE_URL,
"request_token_url": None,
},
},
]
# Fetch Keycloak Public Key
try:
response = requests.get(KEYCLOAK_ISSUER)
response.raise_for_status()
key_der_base64 = response.json().get("public_key")
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)
except requests.RequestException as e:
log.error(f"Failed to fetch Keycloak public key: {e}")
raise
# Custom Authentication View to Handle Keycloak Logout
class CustomAuthRemoteUserView(AuthOAuthView):
@expose("/logout/", methods=["GET", "POST"])
def logout(self):
return super().logout()
# Custom Security Manager for Keycloak Authentication
class CustomSecurityManager(AirflowSecurityManager):
authoauthview = CustomAuthRemoteUserView
def oauth_user_info(self, provider, response):
if provider == "keycloak":
token = response.get("access_token")
if not token:
log.warning("No access token found in the response.")
return {}
try:
user_info = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=KEYCLOAK_CLIENT_ID,
)
except jwt.ExpiredSignatureError:
log.error("Token has expired.")
return {}
except jwt.InvalidTokenError as e:
log.error(f"Invalid token: {e}")
return {}
roles = (
user_info.get("resource_access", {})
.get(KEYCLOAK_CLIENT_ID, {})
.get("roles", [AUTH_DEFAULT_ROLE])
)
return {
"username": user_info.get("preferred_username"),
"email": user_info.get("email"),
"first_name": user_info.get("given_name"),
"last_name": user_info.get("family_name"),
"role_keys": roles,
}
return {}
# Assign Custom Security Manager to Airflow
SECURITY_MANAGER_CLASS = CustomSecurityManager
else:
# Fallback to Database Authentication if Keycloak is not enabled
AUTH_TYPE = AUTH_DB
log.info("Keycloak is disabled. Using database authentication (AUTH_DB).")
- For scenarios like local development, you can disable Keycloak authentication by setting the
KEYCLOAK_ENABLED
environment variable tofalse
. This will revert the authentication method to the default database-based authentication. - If specific roles need to be mapped from Keycloak to Airflow, you can modify the role mappings by adjusting the
KEYCLOAK_XXX_ROLE
environment variables to suit your needs.
2. Start Airflow with Keycloak Configuration
Run Airflow with Docker, passing in the custom webserver configuration file and necessary environment variables for Keycloak.
docker run -d --name airflow-webserver \
--expose 8080 \
-e KEYCLOAK_ENABLED=true \
-e KEYCLOAK_ISSUER=http://localhost:9090/realms/airflow-realm \
-e KEYCLOAK_CLIENT_ID=airflow-client \
-e KEYCLOAK_CLIENT_SECRET=your-keycloak-client-secret \
-v ./webserver_config.py:/opt/airflow/webserver_config.py \
--network=host \
apache/airflow:2.10.0 \
bash -c "airflow db init && airflow webserver"
- This command runs just the Airflow webserver for testing purposes.
- Using the
--network=host
option allows Docker to directly access services running on localhost like Keycloak, so there's no need to map ports with-p
. Simply exposing the port--expose 8080
is enough. - Environment variables
KEYCLOAK_ENABLED
,KEYCLOAK_ISSUER
,KEYCLOAK_CLIENT_ID
, andKEYCLOAK_CLIENT_SECRET
provide Keycloak configuration for Airflow. Be sure to replaceyour-keycloak-client-secret
with the actual client secret from your Keycloak setup. - The
-v
option mounts the customwebserver_config.py
file into the container.
To monitor the Airflow process and troubleshoot any errors, use this command.
docker logs -f airflow-webserver
Testing the Integration
1. Access Airflow
Open a browser and navigate to http://localhost:8080. You should be redirected to a login page with an option to sign in using Keycloak.
2. Log In with Keycloak
Use the credentials of a Keycloak user, such as an Admin or Viewer.
3. Verify User Profile
After logging in, review the user profile ➊ in Airflow. Admin users should see the Admin role, and Viewer users should see the Viewer role.
Conclusion
Integrating Keycloak with Airflow provides centralized user management and enhanced security. This setup simplifies role management and streamlines user access across applications.
Extra
Accessing the API Using Keycloak Access Token
API authentication is managed separately from web-based authentication. By default, the system checks the user session for API access. To access the API using a Keycloak access token, a custom user authentication backend must be implemented.
1. Create a file named user_auth.py
that contains the custom authentication logic for Keycloak token validation.
# This configuration file is based on the official Apache Airflow basic authentication backend.
# Original file: https://github.com/apache/airflow/blob/main/airflow/providers/fab/auth_manager/api/auth/backend/basic_auth.py
from __future__ import annotations
import logging
import os
from base64 import b64decode
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, Any, TypeVar, cast
import jwt
import requests
from airflow.providers.fab.auth_manager.security_manager.override import (
FabAirflowSecurityManagerOverride,
)
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.www.extensions.init_auth_manager import get_auth_manager
from cryptography.hazmat.primitives import serialization
from flask import Response, current_app, request
from flask_appbuilder.const import AUTH_LDAP, AUTH_OAUTH
from flask_login import login_user
if TYPE_CHECKING:
from airflow.providers.fab.auth_manager.models import User
CLIENT_AUTH: tuple[str, str] | Any | None = None
T = TypeVar("T", bound=Callable)
log = logging.getLogger(__name__)
def init_app(_):
"""Initialize authentication backend."""
def auth_current_user() -> User | None:
"""Authenticate and set current user if Authorization header exists."""
ab_security_manager = get_airflow_app().appbuilder.sm
user = None
if ab_security_manager.auth_type == AUTH_OAUTH:
if not request.headers["Authorization"]:
return None
# Keycloak Settings
AUTH_DEFAULT_ROLE = os.getenv("AUTH_DEFAULT_ROLE", "Public")
KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "airflow")
KEYCLOAK_ISSUER = os.environ["KEYCLOAK_ISSUER"].rstrip("/")
# Fetch Keycloak Public Key
try:
response = requests.get(KEYCLOAK_ISSUER)
response.raise_for_status()
key_der_base64 = response.json().get("public_key")
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)
except requests.RequestException as e:
log.error(f"Failed to fetch Keycloak public key: {e}")
raise
token = str.replace(str(request.headers["Authorization"]), "Bearer ", "")
try:
user_info = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=KEYCLOAK_CLIENT_ID,
)
except jwt.ExpiredSignatureError:
log.error("Token has expired.")
return {}
except jwt.InvalidTokenError as e:
log.error(f"Invalid token: {e}")
return {}
roles = (
user_info.get("resource_access", {})
.get(KEYCLOAK_CLIENT_ID, {})
.get("roles", [AUTH_DEFAULT_ROLE])
)
userinfo = {
"username": user_info.get("preferred_username"),
"email": user_info.get("email"),
"first_name": user_info.get("given_name"),
"last_name": user_info.get("family_name"),
"role_keys": roles,
}
user = ab_security_manager.auth_user_oauth(userinfo)
else:
auth = request.authorization
if auth is None or not auth.username or not auth.password:
return None
security_manager = cast(
FabAirflowSecurityManagerOverride, get_auth_manager().security_manager
)
user = None
if security_manager.auth_type == AUTH_LDAP:
user = security_manager.auth_user_ldap(auth.username, auth.password)
if user is None:
user = security_manager.auth_user_db(auth.username, auth.password)
if user is not None:
login_user(user, remember=False)
if user is not None:
login_user(user, remember=False)
return user
def requires_authentication(function: T):
"""Decorate functions that require authentication."""
@wraps(function)
def decorated(*args, **kwargs):
if auth_current_user() is not None or current_app.config.get(
"AUTH_ROLE_PUBLIC", None
):
return function(*args, **kwargs)
else:
return Response("Unauthorized", 401, {"WWW-Authenticate": "Basic"})
return cast(T, decorated)
2. Mount the file to the container and set the AIRFLOW__API__AUTH_BACKENDS
environment variable to use the custom authentication backend.
docker run -d --name airflow-webserver \
--expose 8080 \
-e AIRFLOW__API__AUTH_BACKENDS=user_auth \
-e KEYCLOAK_ENABLED=true \
-e KEYCLOAK_ISSUER=http://localhost:9090/realms/airflow-realm \
-e KEYCLOAK_CLIENT_ID=airflow-client \
-e KEYCLOAK_CLIENT_SECRET=your-keycloak-client-secret \
-v ./webserver_config.py:/opt/airflow/webserver_config.py \
-v ./user_auth.py:/opt/airflow/user_auth.py \
--network=host \
apache/airflow:2.10.0 \
bash -c "airflow db init && airflow webserver"
3. Test the configuration by generating an access token using Keycloak and retrieving the list of DAGs to confirm that the custom authentication is functioning correctly.
export TOKEN=$(curl "http://localhost:9090/realms/airflow-realm/protocol/openid-connect/token" \
-d "client_id=airflow-client" \
-d "client_secret=your-keycloak-client-secret" \
-d "username=admin" \
-d "password=admin" \
-d "grant_type=password" \
| jq -r ".access_token")
curl http://localhost:8080/api/v1/dags -H "Authorization: Bearer $TOKEN"
Remember to replace your-keycloak-client-secret
with the actual client secret from your Keycloak configuration.
Customizing the Airflow Image
To incorporate both the custom webserver_config.py
and, if needed, the custom user_auth.py
for API authentication into your Airflow image, use the following Dockerfile as a guide.
FROM apache/airflow:2.10.0
COPY webserver_config.py /opt/airflow/webserver_config.py
# COPY user_auth.py /opt/airflow/user_auth.py # Uncomment this line if you're using custom API authentication.
Read more…
- https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/webserver-authentication.html
- https://github.com/apache/airflow/blob/351961caf46e32f10e61691ca6d88d55db1e4d70/docs/apache-airflow-providers-fab/auth-manager/webserver-authentication.rst#example-using-team-based-authorization-with-keycloak
- https://github.com/apache/airflow/blob/351961caf46e32f10e61691ca6d88d55db1e4d70/airflow/providers/fab/auth_manager/security_manager/override.py#L2231
- https://blog.devgenius.io/airflow-authentication-with-rbac-and-keycloak-2c34d2012059
- https://discuss.dataengineercafe.io/t/airflow-authentication-with-keycloak/240