Let’s Publish Keycloak Events to Kafka using SPI (plugins)

Youssef Bentaleb
Nemo Technology
Published in
5 min readJan 9, 2022
Integrating keycloak with kafka

In this post, we will develop a plugin to stream Keycloak events to a Kafa topic using the Keycloak SPI’s. We will discuss the keycloak SPI implementation and integration with Kafka, also the configuration required.

What’s Keycloak SPI?

Keycloak provides an extendable API out of the box (using JBOSS submodules) intended to cover most use-cases. But, at the same time, it offers large possibilities of customization. Implementing your provider, using Service Provider Interfaces (SPI), is a major one.

SPI Implementation

Implementing SPI requires its Provider Factory, Provider Interfaces, and a service configuration file.

First, let’s create our java project and add a few dependencies.

Below the gradle.build:

plugins {
id 'java'
}
group 'com.yousseefben'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
ext {
keycloakVersion = '15.0.2'
}
repositories {
mavenCentral()
}
dependencies {
implementation group: 'org.keycloak', name: 'keycloak-server-spi', version: "${keycloakVersion}"
implementation group: 'org.keycloak', name: 'keycloak-server-spi-private', version: "${keycloakVersion}"
implementation group: 'org.keycloak', name: 'keycloak-services', version: "${keycloakVersion}"
}

In this case, we’ll implement KeycloakCustomEventListenerFactory which involves EventListenerProviderFactory and KeycloakCustomEventListener

KeycloakCustomEventListenerFactory:

public class KeycloakCustomEventListenerFactory implements EventListenerProviderFactory {
private KeycloakCustomEventListener keycloakCustomEventListener;
private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListenerFactory.class); @Override
public EventListenerProvider create(KeycloakSession keycloakSession) {
} @Override
public void init(Config.Scope scope) {
}
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
} @Override
public void close() {
} @Override
public String getId() {
return "kafka-event";
}

KeycloakCustomEventListener:

public class KeycloakCustomEventListener implements EventListenerProvider {    private static final Logger log= Logger.getLogger(KeycloakCustomEventListener.class);    public KeycloakCustomEventListener(String topicKafka, Properties props) {    }    @Override
public void onEvent(Event event) {
} @Override
public void onEvent(AdminEvent adminEvent, boolean b) {
} @Override
public void close() {
}
}

Add Service Configuration file:

With these specific resources: META-INF/services/org.keycloak.events.EventListenerProviderFactory

com.yousseefben.KeycloakCustomEventListenerFactory

Add Kafka to your project:

  • Modify build.graddle file and add kafka-clients packages
...
dependencies {
...
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.0.0'
}
  • Edit KeycloakCustomEventListenerFactory :
public class KeycloakCustomEventListenerFactory implements EventListenerProviderFactory {
private KeycloakCustomEventListener keycloakCustomEventListener;
private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListenerFactory.class); private String topicKafka;
private String bootstrapServers;
private boolean sslEnabled;
private String keystoreLocation;
private String keystorePassword;
private String trustSoreLocation;
private String trustSorePassword;
private boolean jaasEnabled;
private String jaasConfig;
private String saslMechanism;
private String securityProtocol;
private Properties properties;
@Override
public EventListenerProvider create(KeycloakSession keycloakSession) {
if (keycloakCustomEventListener == null) {
keycloakCustomEventListener = new KeycloakCustomEventListener(topicKafka, properties);
}
return keycloakCustomEventListener;
}
@Override
public void init(Config.Scope scope) {
log.info("Init kafka");
bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
keystoreLocation = System.getenv("KAFKA_SSL_KEYSTORE_LOCATION");
keystorePassword = System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD");
trustSoreLocation = System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION");
trustSorePassword = System.getenv("KAFKA_SSL_TRUSTSTORE_PASSWORD");
jaasConfig = System.getenv("KAFKA_SASL_JAAS_CONFIG");
saslMechanism = System.getenv("KAFKA_DEFAULT_SASL_MECHANISM");
securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
sslEnabled = "true".equalsIgnoreCase(System.getenv("KAFKA_SSL_ENABLED");
jaasEnabled = "true".equalsIgnoreCase(System.getenv("KAFKA_JAAS_ENABLED"));
topicKafka = System.getenv("KAFKA_TOPIC"); log.info("Kafka ssl enabled: " + sslEnabled);if (topicKafka == null || topicKafka.isEmpty()) {
throw new NullPointerException("topic is required.");
}
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
throw new NullPointerException("bootstrapServers are required");
}
if (sslEnabled && (keystoreLocation == null || keystorePassword == null || trustSoreLocation == null || trustSorePassword == null)) {
throw new NullPointerException("ssl params required");
} properties = getProperties();
} @Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
} @Override
public void close() {
} @Override
public String getId() {
return "kafka-event";
}
private Properties getProperties() {
Properties propsKafka = new Properties();
propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
if (sslEnabled) {
propsKafka.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation);
propsKafka.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
propsKafka.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustSoreLocation);
propsKafka.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustSorePassword);
}
if (jaasEnabled) {
propsKafka.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
propsKafka.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
propsKafka.put("security.protocol", securityProtocol);
}
return propsKafka;
}
}

Now, let’s check the required properties for the init. And handle specifics like enabling ssl and jaas.

If you need further details about kafka-client configuration, please refer to the official documentation.

  • Edit KeycloakCustomEventListener :
public class KeycloakCustomEventListener implements EventListenerProvider {    private static final Loggerlog= Logger.getLogger(KeycloakCustomEventListener.class);    private final CustomKafkaProducer customKafkaProducer;
private ObjectMapper mapper;
public KeycloakCustomEventListener(String topicKafka, Properties props) {
log.info("init custom event listener");
mapper = new ObjectMapper();
customKafkaProducer = new CustomKafkaProducer(topicKafka, props);
}
@Override
public void onEvent(Event event) {
log.info("Event: " + event.getType() + "userId: {}" + event.getUserId());
try {
customKafkaProducer.publishEvent(mapper.writeValueAsString(event));
} catch (JsonProcessingException e) {
log.error("error: " + e.getMessage());
}
} @Override
public void onEvent(AdminEvent adminEvent, boolean b) {
log.info("Admin Event: " + adminEvent.getResourceType().name());
try {
customKafkaProducer.publishEvent(mapper.writeValueAsString(adminEvent));
} catch (JsonProcessingException e) {
log.error("error: " + e.getMessage());
}
}
@Override
public void close() {
}
}

Now, we instantiate our custom kafka producer CustomKafkaProducer and publish a message publishEvent.

public class CustomKafkaProducer {    private static final Logger log= Logger.getLogger(CustomKafkaProducer.class);    private final String topic;
private final KafkaProducer<String, String> producer;
public CustomKafkaProducer(String topic, Properties props) {
log.info("init producer");
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
this.topic = topic;
producer = new KafkaProducer<>(props);
}
public void publishEvent(String value) {
log.info("publish event");
ProducerRecord<String, String> eventRecord =
new ProducerRecord<>(topic, value);
producer.send(eventRecord);
}
}

SPI Deployment

💡 WARNING: Deploying an SPI jar with some issue/exception can cause failure with your Keycloak Server run. To avoid issues with your Keycloak instance, is recommended to work on any experimental server instead of registrating the SPI under events using keycloak UI.

To register provider implementations we can simply use the Keycloak deployer approach to handle several dependencies automatically for you. The hot deployment and the re-deployment are also supported.

Server

If you copy your provider jar to the Keycloak standalone/deployments/ directory, your provider will automatically be deployed. Hot deployment works too.

Docker

The simplest way to deploy the spi in a docker container is to create a custom docker image from the keycloak base image. The keycloak-kafka.jar must be added to the /standalone/deployments folder.

example to DockerFile

FROM quay.io/keycloak/keycloak:15.0.2
ADD ./keycloak-spi-1.0-SNAPSHOT.jar /opt/jboss/keycloak/standalone/deployments/
EXPOSE 8080
EXPOSE 8443
ENTRYPOINT [ "/opt/jboss/tools/docker-entrypoint.sh" ]CMD ["-b", "0.0.0.0"]

Build the image:

docker build -t keycloak-kafka .

Example to run the image:

docker run -p 8080:8080 -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=admin -e KAFKA_TOPIC=MY_TOPIC keycloak-kafka

Enable the configuration into keylcoak

keycloak configuration to add the kafka event SPI

Conclusion

The entire code is available here:

keycloak-kafka-spi

This is a basic example to capture events with Keycloak SPI. We can achieve event-driven architecture by publishing events to Kafka. We can do lots more. There are several SPI plugins that we can explore in keycloak.

If you like this blog, please like & share 🙂

Bye!

--

--

Youssef Bentaleb
Nemo Technology

Lead Full Stack Developer | Software Craftsmanship (TDD, Clean Code …)