Kafka SASL/OAUTHBEARER 인증 구성

Victor Park
SPITHA Blog
Published in
20 min readAug 9, 2023

by JW.Song

Apache Kafka 2.0 버전에서 SASL/OAUTHBEARER 인증이 도입되었으나 거의 reference 구현에 불과하고 외부 OAuth 인증서버와 연동하도록 구성할 수 없었다.

최근 Kafka 3.1.0 릴리즈에 KIP-768에 대한 구현이 추가되면서 외부 OIDC(OpenID Connect; OAuth 2.0) 인증서버와 연동이 가능해졌다. 그동안에도 LoginCallbackHandler, ValidatorCallbackHandler 등을 별도로 구현하면 연동이 가능했었지만 reference가 부족했으며 Kafka 배포판을 커스터마이즈해야 하는 부담이 있었다.

KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

설정 과정

다음의 순서로 SASL/OAUTHBEARER 인증을 구성하는 과정을 설명한다.

  • OpenID 인증 서버 구성; Keyclock
  • Client 계정 생성 및 설정
  • SASL/OAUTHBEARER 인증 구성; Apacke Kafka 3.1.0

OpenID 인증 서버 구성

KIP-768 구현 내용은 OIDC를 지원하기 위한 SASL/OAUTHBEARER 인증 확장이므로 OIDC를 지원하는 인증 서버가 필요하다. 본 매뉴얼에서는 오픈소스 소프트웨어 Keycloak도커 이미지를 사용하여 인증서버를 구성한다.

다음의 docker-compose 파일을 참고하여 keyclock을 구성한다. 예제 구성에서는 keycloak 관리자 계정 및 패스워드를 지정했다.

version: "3"

services:
mariadb:
image: mariadb:10
restart: always
volumes:
- $PWD/data:/var/lib/myql
environment:
MARIADB_ROOT_PASSWORD: rootpassword
MARIADB_DATABASE: keycloak
MARIADB_USER: keycloak
MARIADB_PASSWORD: dbpassword

keycloak:
image: jboss/keycloak:16.1.1
depends_on:
- mariadb
restart: always
ports:
- 80:8080
environment:
DB_ADDR: mariadb
#DB_PORT: 3306
#DB_DATABASE: keycloak
DB_USER: keycloak
DB_PASSWORD: dbpassword
KEYCLOAK_USER: admin
KEYCLOAK_PASSWORD: adminpassword

다음 예와 같이 관리자 계정을 별도로 생성할 수 있다. 컨테이너 재시작이 필요할 수 있다.

docker-compose exec keycloak /opt/jboss/keycloak/bin/add-user-keycloak.sh -u <USERNAME> -p <PASSWORD>

Client 계정 생성

SASL/OAUTHBEAER 인증에 사용될 계정을 생성해야 한다. keycloak에서 client 계정은 객체의 일종으로 realm으로 구분되는 공간에 속한다. realm은 mysql의 database, 혹은 oracle의 tablespace와 유사한 개념이다.

realm 생성

keycloak을 설정하면 기본적으로 master realm이 생성되어 있다. 본 예제에서는 spark-kafka-cluster realm을 생성하여 설명한다. 다음 그림과 같이 생성한다.

[step1 — Add realm]
[step2 — realm name]

endpoint 확인

realm을 생성하면 아래 그림과 같이 General 탭에서 붉은색 박스 부분을 클릭하면 API 엔드포인트를 확인할 수 있다.

다음 예제는 realm endpoint 구성의 일부분이다.

{
"issuer":"http://172.17.20.211/auth/realms/spark-kafka-cluster",
"authorization_endpoint":"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/auth",
"token_endpoint":"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/token",
"introspection_endpoint":"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/token/introspect",
"userinfo_endpoint":"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/userinfo",
"end_session_endpoint":"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/logout",
"frontchannel_logout_session_supported":true,
"frontchannel_logout_supported":true,
"jwks_uri":"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/certs",
"check_session_iframe":"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/login-status-iframe.html",
"grant_types_supported":[
"authorization_code",
"implicit",
"refresh_token",
"password",
"client_credentials",
"urn:ietf:params:oauth:grant-type:device_code",
"urn:openid:params:grant-type:ciba"
],
...
}

다음의 항목을 확인해야 한다.

  • token_endpoint
    client access token을 획득하기 위한 엔드포인트
  • jwks_uri
    Kafka broker가 JWKS를 획득하기 위한 엔드포인트
  • grant_type_supported
    client_credentials이 지원되는지 확인이 필요함

client 생성

현재 카프카 3.1.0에서는 client_credentials 인증만 가능하므로 적절한 형태로 client 계정을 생성해야 한다.

client 계정 추가

왼쪽 Clients 메뉴를 선택하고 테이블 우측 상단의 Create 버튼으로 새로운 client를 생성한다.

client 설정

client_credentials 인증을 위해 다음 그림과 같이 설정한다. Access Typeconfidential, Service Accounts EnabledON으로 설정하고 저장한다.

Valid Redirect URIs 설정은 사용되지 않지만, 빈칸으로 두면 오류가 발생하니 그림과 같이 입력한다.

client secret 확인

설정을 저장하면 상단 Credential 탭에서 인증에 필요한 secret을 확인할 수 있다. 필요에 따라 secret을 재생성할 수 있다.

audience 생성(optional)

카프카 broker가 토큰을 획득하고 audience 항목이 올바르게 구성되어 있는지 확인한다. keycloak에서 생성된 client에 기본으로 구성된 audience를 확인하거나 필요에 따라 audience를 특별하게 구성할 수 있다. 예제에서 사용된 keycloak 16.1의 기본 audience는 account이다.

scope 생성

먼저 왼쪽 Client Scopes 메뉴를 선택하고 테이블 우측 상단의 Create 버튼으로 새로운 scope를 생성한다.

mapper 추가

생성된 scope의 Mappers 탭에서 Create 버튼으로 새로운 Mapper를 생성한다. 다음 그림의 예제와 같이 Mapper TypeAudience, Included Client Audiencekafka-client로 설정한다.

client 설정에 scope 추가

이렇게 생성된 scope를 cline 설정의 상단 Client Scopes 탭 화면에서 추가해야 audience 설정이 올바르게 마무리 된다.

테스트

다음의 명령을 이용하여 구성을 테스트하고 다섯 단계를 모두 PASS하면 keycloak이 올바르게 구성된 것이다.

./bin/kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \
--clientId "kafka-client" \
--clientSecret "Ub6ixQwJxJ8lj9tvhoaxROPOUvGHK9fS" \
--sasl.oauthbearer.expected.audience "account" \
--sasl.oauthbearer.token.endpoint.url http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/token \
--sasl.oauthbearer.jwks.endpoint.url http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/certs

kafka broker 설정

kafka_jaas.conf 구성 (optional)

kafka 인증을 사용하기 위해 jaas 설정을 해야 한다. SASL/OAUTHBEARER 인증을 사용하기 위해 다음의 예와 같이 config/kafka_jaas.conf 파일을 작성한다. kafka 리스너 sasl.jaas.config 설정을 한다면 jaas 파일을 작성할 필요 없다.

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
};

server.properties 구성

카프카 브로커 구동에 사용되는 server.properties 파일은 다음과 같이 구성한다.

listener 설정

listeners=SASL_PLAINTEXT://:9093
  • listeners
    리스너 이름 SASL_PLAINTEXT으로 9093번 포트에 등록

sasl mechanism 설정

inter.broker.listener.name=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER
  • inter.broker.listener.name
    브로커 간 접속을 위한 리스너 이름
  • sasl.mechanism.inter.broker.protocol
    브로커 간 인증 메커니즘 설정
  • sasl.enabled.mechanisms
    활성화된 SASL 인증 메커니즘

server sasl 설정

브로커가 접속해오는 클라이언트를 인증하기 위해 다음과 같은 설정이 필요하다.

listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="kafka-client" clientSecret="Ub6ixQwJxJ8lj9tvhoaxROPOUvGHK9fS" ;
sasl.oauthbearer.jwks.endpoint.url=http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/certs
sasl.oauthbearer.expected.audience=account
  • listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class
    SASL_PLAINTEXT 리스너의 OAUTHBEARER 메커니즘 인증에서 사용될 서버 콜백 핸들러 클래스 (로그인 정보 확인)
  • listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config
    SASL_PLAINTEXT 리스너의 OAUTHBEARER 메커니즘 인증에서 사용될 jaas 설정
    - clientID
    client_credentials 인증을 위한 클라이언트 아이디
    - clientSecret
    client_credentials 인증을 위한 클라이언트 시크릿
  • sasl.oauthbearer.jwks.endpoint.url
    JWKS 획득을 위한 엔드포인트
  • sasl.oauthbearer.expected.audience
    토큰에 포함되어야 하는 audience 값

client sasl 설정

브로커가 다른 브로커에 접속할 때 인증을 사용해야 하므로 클라이언트 관련 설정도 필요하다.

listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/token
  • listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class
    SASL_PLAINTEXT 리스너의 OAUTHBEARER 메커니즘 인증에서 사용될 로그인 콜백 핸들러 클래스 (토큰 획득)
  • sasl.oauthbearer.token.endpoint.url
    토큰 획득 엔드포인트

broker 기동

카프카가 설치된 디렉터리에서 다음의 예와 같이 구동할 수 있다.

KAFKA_OPTS="-Djava.security.auth.login.config=config/kafka_jaas.conf" \
bin/kafka-server-start.sh -daemon config/server.properties

테스트

콘솔 도구를 이용하여 인증이 올바르게 구성되었는지 테스트할 수 있다. 테스트를 위해 keycloak에서 새로운 클라이언트 console-client를 생성했다.

client.properties 생성

다음의 예와 같이 config/client.properties 파일을 생성한다. client credential은 새롭게 생성된 정보를 사용하였다.

sasl.mechanism=OAUTHBEARER
security.protocol=SASL_PLAINTEXT

sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/token

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="console-client" clientSecret="uA93jnkv1lTAIMTmBoN1EnUlZzmw4gfO" ;

토픽 목록 확인

다음의 예제와 같이 OAUTHBEARE 인증을 이용하여 토픽의 목록을 확인할 수 있다.

bin/kafka-topics.sh --command-config config/client.properties --bootstrap-server localhost:9093 --list

명령이 실행되고 인증이 올바르게 수행되면 다음의 예와 같이 출력된다.

[2022-04-06 14:12:35,292] WARN [Principal=:f41f11f6-dc51-4a58-8dfc-4819dbdac9e3]: Expiring credential expires at Wed Apr 06 14:17:32 KST 2022, so buffer times of 60 and 300 seconds at the front and back, respectively, cannot be accommodated.  We will refresh at Wed Apr 06 14:16:35 KST 2022. (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
test
test2

ACL 고려 사항

ACL을 설정하게 되면 super.users에 OAuth 계정을 등록할 필요가 있을 수 있다. 이때 clientId를 등록하면 안되고 인증후 반환되는 sub 정보가 principal이 되며, 이 principal을 등록해 주어야 한다.
keyclock 화면에서 sub 값을 확인하는 방법은 찾지 못했다. 따라서 다른 방법을 통해 sub 값을 확인 해야 한다.

principal 추출

다음의 예와 같이 curl을 이용하여 OIDC 인증을 시도하면 JWT Access Token에서 sub 항목을 추출할 수 있다. 실행 결과로 f9a0bfa7-bfa4-4fc0-96c0-ab90ca4fcbc1와 같은 UUID 형태의 sub 값을 얻을 수 있다.

curl \
-d "client_id=kafka-client" \
-d "client_secret=Ub6ixQwJxJ8lj9tvhoaxROPOUvGHK9fS" \
-d "grant_type=client_credentials" \
"http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/token" \
| jq -r .access_token \
| tr '.' ' ' | awk '{print $2}' \
| base64 -d \
| jq -r .sub
  • -d “client_id=kafka-client”
    OAuth 인증 받을 계정의 client_id
  • -d “client_secret=Ub6ixQwJxJ8lj9tvhoaxROPOUvGHK9fS”
    OAuth 인증 받을 계정의 secret
  • -d “grant_type=client_credentials”
    인증 타입은 client credentials 제출
  • “http://172.17.20.211/auth/realms/spark-kafka-cluster/protocol/openid-connect/token"
    OIDC 토근 발급 주소
  • | jq -r .access_token
    응답에서 access_token만 분리
  • | tr ‘.’ ‘ ‘ | awk ‘{print $2}’
    JWT 토큰에서 데이터 부분 분리
  • | base64 -d
    base64 디코드
  • | jq -r .sub
    sub 항목 분리

kafka ACL 설정

다음의 예와 같이 authorizer.class.name을 설정하고, 이전 단계에서 얻어진 principal 값을 super.users에 설정할 수 있다.

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:f9a0bfa7-bfa4-4fc0-96c0-ab90ca4fcbc1;User:admin;User:...

--

--