Azure Resources Operator with Quarkus

Geoffrey
13 min readNov 15, 2021

--

Introduction

If you are using Kubernetes you probably heard about Operators. An operator is a extension of Kubernetes.

Operators are software extensions to Kubernetes that make use of custom resources to manage applications and their components. Operators follow Kubernetes principles, notably the control loop.

A simple example

Imagine you are using Apache Kafka and you want to create your topics on a Kafka cluster during your application deployment and remove them when you un-deploy your application. You can develop an operator which will create the topic on the Kafka cluster for you when it will detect a Topic resource and delete it when the resource will be deleted.

You can define 2 resources:

  • A cluster resource, which contains the definition of a the Kafka Cluster to target
apiVersion: org.example/v1alpha1
kind: KafkaCluster
metadata:
name: my-super-cluster
namespace: my-namespace
spec:
bootstrapUrl: host1.domain.com,host2.domain.com,host3.domain.com
  • A topic resource, which contains topic definition
apiVersion: org.example/v1alpha1
kind: KafkaTopic
metadata:
name: my-super-topic
namespace: my-namespace
spec:
cluster: my-cluster
partitions: 3
replicas: 3

Define our Operator

In this article we will write a Kubernetes Operator to provision Azure Resources using the Azure Rest API. I will try to show you that Golang is not the unique choice to write Operators. Actually Quarkus is pretty powerful to do the Job.

Here is the global picture:

Here is the flow:

Prerequisite

To start you will need to create an Azure Service Principal.

To create it with a single command line you can type:

$ az ad sp create-for-rbac --name MyFirstServicePrincipal
{
"appId": "<app_id>",
"displayName": "<display_name>",
"name": "<name>",
"password": "<password>",
"tenant": "<tenant>"
}

Our operator will then use the following parameters:

  • Client ID: The appId of your SP
  • Client Secret: The password of your SP
  • Tenant ID: The Tenant of your SP
  • Subscription ID: The subscription ID on which you want to deploy your resources

Build the operator

Dependencies

To start you need first to create a Quarkus project. You can go to https://code.quarkus.io/. We will then add the following dependencies:

To work with Kubernetes:

<!-- The Quarkus Operator SDK -->
<dependency>
<groupId>io.quarkiverse.operatorsdk</groupId>
<artifactId>quarkus-operator-sdk</artifactId>
</dependency>
<!-- The Java Kubernetes Client -->
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>

To call the Azure API we won’t use the Azure SDK as it cannot be built in native with GaalVM. We will use the Microprofile Rest Client.

<!-- Uses the microprofile Rest client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<!-- We will use Jackson for Json -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>
<!-- Provides features such as Retry -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>

Finally we will add some additional feature we will soon later:

<!-- Used to scheduled periodic tasks -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
<!-- Used to Build container image using JiB-->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-jib</artifactId>
</dependency>
<!-- Easy way to expose Configurable Readiness/Liveness Probes -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
<!-- To have a cleaner code by providing features such as Getter/Setter/Constructors...-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>

HTTP Error handling

When you call a rest API you may encounter errors. For example:

  • Error >= 400 (client errors): Example error 401 (Unauthorized)
  • Error >= 500 (server Errors): Example error 500 (Internal Server Error)

If we look at Azure Specs we can see the following format:

{
"error": {
"code": "ErrorCode",
"message": "Explanation message"
}
}

Here is an example:

{
"error": {
"code": "ParentResourceNotFound",
"message": "Can not perform requested operation on nested resource. Parent resource 'my-first-namespaces' not found."
}
}

Let’s first build our ApiException. Our exception will contain the status code, the correlationId for trouble shooting (x-ms-correlation-request-id header), the error code and error message from Azure

@Slf4j
@Getter
public class ApiException extends Exception {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long serialVersionUID = 1L;
/**
* This header can be sent in input to correlate Request with response
*/
private static final String CORRELATION_ID_HEADER = "x-ms-correlation-request-id";
/**
* The status code
*/
private int status;
/**
* The correlation ID coming from header x-ms-correlation-request-id
*/
private String correlationId;
/**
* The resource error
*/
private String errorCode;
/**
* The final error message
*/
private String errorMessage;
public ApiException(Response response) {
this.status = response.getStatus();
this.correlationId = response.getHeaderString(CORRELATION_ID_HEADER);
try {
JsonNode tree = OBJECT_MAPPER.readTree(response.readEntity(String.class));
this.errorMessage = getValue(tree, "/error/message").orElse(null);
this.errorCode = getValue(tree, "/error/code").orElse(null);
} catch (Exception e) {
log.error("Cannot process body", e);
}
}
private Optional<String> getValue(JsonNode tree, String path) {
JsonNode value = tree.at(path);
if (value.isMissingNode()) {
return Optional.empty();
}
return Optional.of(value.asText());
}
@Override
public String getMessage() {
return String.format("ApiException(CorrelationId: %s / Status Code: %d / Error Code: %s / Error Message: %s)", correlationId,
status, errorCode, errorMessage);
}
}

Then let’s build our Exception Mapper. The exception mapper is in charge of interpreting the response and translate it in an Exception. For example here we say that in case the status is >= 400 (and not 429: see below) we through the ApiException.

@Provider
public class ApiExceptionMapper
implements ResponseExceptionMapper<ApiException> {

@Override
public boolean handles(int status, MultivaluedMap<String, Object> headers) {
return status >= 400 && status != 429;
}
@Override
public ApiException toThrowable(Response response) {
return new ApiException(response);
}
}

The @Provider indicates that this implementation must be discovered at Runtime

The error 429 case:

The 429 (Too many request) is a bit special as it means that you are performing too many requests at the same time. Retrying with a delay or a back-off should solve the issue. We will create a dedicated exception and use a retry mechanism.

public class TooManyRequestExeption extends RuntimeException {
private static final long serialVersionUID = 1L;
private Response response;
public TooManyRequestExeption() {
super();
}
public TooManyRequestExeption(Response response) {
super("TooManyRequestExeption to " + response.getLocation());
this.response = response;
}
public Response getResponse() {
return this.response;
}
}

The exception Mapper:

@Provider
public class TooManyRequestExeptionMapper
implements ResponseExceptionMapper<TooManyRequestExeption> {
@Override
public boolean handles(int status, MultivaluedMap<String, Object> headers) {
return status == 429;
}
@Override
public TooManyRequestExeption toThrowable(Response response) {
return new TooManyRequestExeption(response);
}
}

And this is were the magic happen. You can decorate your rest client with the @Retry annotation from microprofile fault tolerance to indicates you want to retry for a given exception with a delay and a max retry (or max delay):

@Retry(retryOn = TooManyRequestExeption.class, delay = 1, delayUnit = ChronoUnit.SECONDS, maxRetries = 5)

Here we say that we want to retry every 1 seconds with a maximum of 5 retry

Authentication

To make authenticated calls to Azure APIs you need to use an access token. This access token must be passed in each call using the Authorization header:

GET /subscriptions/<subscription_id>/resourceGroups/speefl-operator-starter?api-version=2021-04-01 HTTP/1.1
Host: management.azure.com
Authorization: Bearer <access_token>

According to Azure documentation to authenticate you need to do the following POST request:

POST /<tenant_id>/oauth2/token HTTP/1.1
Host: login.microsoftonline.com
Content-Type: application/x-www-form-urlencoded
grant_type=client_credentials&client_id=<client_id>&client_secret=<client_secret>&resource=https://management.azure.com/

The response looks like:

{
"token_type": "Bearer",
"expires_in": "3599",
"ext_expires_in": "3599",
"expires_on": "1636706340",
"not_before": "1636702440",
"resource": "https://management.azure.com/",
"access_token": "jwt_token"
}

Here is the Oauth2Response.java:

@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
public class Oauth2Response {
@JsonProperty("token_type")
private String tokenType;

/**
* Expires in seconds (usually 3599)
*/
@JsonProperty("expires_in")
private String expiresIn;

@JsonProperty("ext_expires_in")
private String extExpiresIn;

@JsonProperty("not_before")
private String notBefore;

@JsonProperty("resource")
private String resource;

@JsonProperty("access_token")
private String accessToken;
}

With microprofile Rest client you just create the interface and register a rest client. Quarkus will then create the rest client for you:

@Retry(retryOn = TooManyRequestExeption.class, delay = 1, delayUnit = ChronoUnit.SECONDS, maxRetries = 5) // <1>
@RegisterRestClient(configKey="oauth2-api") // <2>
@RegisterProviders({
@RegisterProvider(TooManyRequestExeptionMapper.class), // <3>
@RegisterProvider(ApiExceptionMapper.class) // <4>
})
public interface Oauth2AuthenticationApi {
@POST
@Path("/{tenantId}/oauth2/token")
@Produces(MediaType.APPLICATION_FORM_URLENCODED)
@Consumes(MediaType.APPLICATION_JSON)
public Oauth2Response authenticate(@PathParam String tenantId, @FormParam("grant_type") String grantType,
@FormParam("client_id") String clientId, @FormParam("client_secret") String clientSecret,
@FormParam String resource); // <5>
}
  1. The Retry mechanism: max 5 retries with a delay of 1 second. Only on the TooManyRequestException
  2. We register the RestClient and provide the configKey we want to use.
  3. Register the TooManyRequestExceptionMapper
  4. Register the ApiExeptionMapper
  5. The API Call. A method POST to /{tenantId}oauth2/token with the form params.

The configKey allows us to associate a configuration to this rest client (such as the url targeted or the CDI scope). This configuration is done in the application.properties:

# Authentication
oauth2-api/mp-rest/url=https://login.microsoftonline.com
oauth2-api/mp-rest/scope=javax.inject.Singleton

Now we configured the authentication we will create our AuthHeaderFactory to send Authorization header to each calls.

To provide Auth data to the AuthHeaderFactory we will send an event each time the Authentication data changed:

@Getter
@RequiredArgsConstructor
public final class AuthDataEvent {

private final String accessToken;
private final LocalDateTime tokenExpiration;}

The AuthHeaderFactory:

@ApplicationScoped
public class AuthHeaderFactory implements ClientHeadersFactory { // <1>
private static final String AUTH_HEADER_KEY = "Authorization";
private static final String AUTH_HEADER_VALUE_TEMPLATE = "Bearer %s";
private String token;
/**
* Listen to the authDataEvent to update token
*
* @param authDataEvent
*/
void updateToken(@Observes AuthDataEvent authDataEvent) { // <2>
this.token = authDataEvent.getAccessToken();
}
@Override
public MultivaluedMap<String, String> update(MultivaluedMap<String, String> incomingHeaders,
MultivaluedMap<String, String> clientOutgoingHeaders) {
MultivaluedMap<String, String> result = new MultivaluedHashMap<>();
result.add(AUTH_HEADER_KEY, String.format(AUTH_HEADER_VALUE_TEMPLATE, token));
return result; // <3>
}
}
  1. Implements the ClientHeadersFactory
  2. It Observes the AuthEventData emitted (auth updated)
  3. Add the Authorization header to the request

We can now create our AuthenticationService which will authenticate and store access token

@ApplicationScoped
@Slf4j
@Readiness // <1>
public class AuthenticationService implements HealthCheck {
private static final String CLIENT_CREDENTIALS = "client_credentials";
@Inject
@RestClient // <2>
Oauth2AuthenticationApi oauth2AuthenticationApi;
@Inject
OperatorConfiguration operatorConfiguration;
@Inject // <3>
Event<AuthDataEvent> onAuthDataRefreshed;
private String accessToken;
private LocalDateTime tokenExpiration;
@Scheduled(every = "10s") // <4>
void updateAuthData() {

if (accessToken != null && tokenExpiration.isAfter(LocalDateTime.now().plusMinutes(10))) { // <5>
return;
}

Oauth2Response response = null;
try {
response = oauth2AuthenticationApi.authenticate(operatorConfiguration.getTenantId(),
CLIENT_CREDENTIALS,
operatorConfiguration.getClientId(), operatorConfiguration.getClientSecret(),
operatorConfiguration.getAzureApirtUrl());
accessToken = response.getAccessToken();
tokenExpiration = LocalDateTime.now().plusSeconds(Long.valueOf(response.getExpiresIn()));
onAuthDataRefreshed.fire(new AuthDataEvent(accessToken, tokenExpiration));
} catch (Exception e) {
log.error("Unable to refresh token. Response is {}", response, e);
}
}

@Override
public HealthCheckResponse call() { // <6>
return HealthCheckResponse.builder()
.name("auth-ready")
.status(accessToken != null && tokenExpiration.isBefore(LocalDateTime.now()))
.build();
}
}
  1. AuthenticationService is a Readiness service. The application will be considered as ready only when the authentication data are ready
  2. We Inject the Oauth rest client
  3. We Inject the EventEmitter for OauthData
  4. Every 10s we check if authData has expired. Scheduled is launch one time when application has started
  5. 10 minutes before expiration we renew the token (in case there is an issue such as a 503 it gives a bit of time to retry)
  6. If the token is not present or if it has expired we consider the operator down

Resources

We are now ready to start the resources implementation.

For each resources we will:

  • Create a Rest client to call Azure
  • Create the Kubernetes Custom Resource
  • Create the Custom Resource Controller which will listen on the objects updates (create, update, delete)

As I am a lazy guy I decided to use the openapi generator to generate the microprofile Model + Apis from Azure Rest API Spec. I customized a bit the template but it is out of scope of our article.

Queues

Here I will take the example of a Queue but you can take a look to the Github repository (at the end of the article) to see the other resources.

You can take a look to the Queues Rest-API Documentation

The openapi generator generated for us the API model:

  • SBQueue: The request / response payload
  • The microprofile rest API

Here is how the SBQueue looks like:

@Getter
@Setter
@EqualsAndHashCode
@ToString
@JsonInclude(Include.NON_NULL)
public class SBQueue {

@JsonProperty("properties")
private SBQueueProperties properties;
/**
* Resource Id
**/
@JsonProperty("id")
private String id;
/**
* Resource name
**/
@JsonProperty("name")
private String name;
/**
* Resource type
**/
@JsonProperty("type")
private String type;
}

And the API:

@Retry(retryOn = TooManyRequestExeption.class, delay = 1, delayUnit = ChronoUnit.SECONDS, maxRetries = 5) // <1>
@RegisterRestClient(configKey="azure-api") // <2>
@RegisterClientHeaders(AuthHeaderFactory.class) // <3>
@RegisterProviders({
@RegisterProvider(APIVersionProvider.class), // <4>
@RegisterProvider(ApiExceptionMapper.class), // <5>
@RegisterProvider(TooManyRequestExeptionMapper.class), // <6>
})
@Path("/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.ServiceBus/namespaces/{namespaceName}/queues")
public interface QueuesApi {
@PUT
@Path("/{queueName}")
@Consumes({ "application/json" })
@Produces({ "application/json" })
public SBQueue queuesCreateOrUpdate(@PathParam("resourceGroupName") String resourceGroupName, @PathParam("namespaceName") String namespaceName, @PathParam("queueName") String queueName, @PathParam("subscriptionId") String subscriptionId, SBQueue parameters) throws ApiException, ProcessingException;
@DELETE
@Path("/{queueName}")
@Produces({ "application/json" })
public void queuesDelete(@PathParam("resourceGroupName") String resourceGroupName, @PathParam("namespaceName") String namespaceName, @PathParam("queueName") String queueName, @PathParam("subscriptionId") String subscriptionId) throws ApiException, ProcessingException;
@GET
@Path("/{queueName}")
@Produces({ "application/json" })
public SBQueue queuesGet(@PathParam("resourceGroupName") String resourceGroupName, @PathParam("namespaceName") String namespaceName, @PathParam("queueName") String queueName, @PathParam("subscriptionId") String subscriptionId) throws ApiException, ProcessingException;
}
  1. The Retry mechanism: max 5 retries with a delay of 1 second. Only on the TooManyRequestException
  2. Register the RestClient and provide the configKey we want to use.
  3. Register the AuthHeaderFactory which will forward the AuthorizationHeader
  4. You may have notice that queryparameter “api-version” is not provided. I created an APIVersionProvider which automatically add the this query parameter to avoid passing it in the method
  5. Register the ApiExeptionMapper
  6. Register the TooManyRequestExceptionMapper

Here is the configuration definition for “azure-api” in the application.properties

# APIS
azure-api/mp-rest/url=https://management.azure.com
azure-api/mp-rest/scope=javax.inject.Singleton

Here is how the APIVersionProvider looks like:

@ApplicationScoped
public class APIVersionProvider implements ClientRequestFilter {
private static final String API_VERSION_KEY = "api-version";
private static final String API_VERSION = "2017-04-01";
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
requestContext.setUri(UriBuilder.fromUri(requestContext.getUri())
.queryParam(API_VERSION_KEY, API_VERSION)
.build());
}
}

We can now create the Kubernetes Custom Resource.

First the spec. We will extends the SBQueue and add the resourceGroupName and namespaceName.

@Getter
@Setter
public class ServiceBusQueueSpec extends SBQueue {
private String resourceGroupName;
private String namespaceName;

}

And then the Resource itself:

@Group("org.speedfl.io") // <1>
@Version("v1alpha1") // <2>
@ShortNames("azsbq") // <3>
@Getter
@Setter
public class ServiceBusQueue extends CustomResource<ServiceBusQueueSpec, BaseStatus>
implements Namespaced {
private static final long serialVersionUID = 1L;

private ServiceBusQueueSpec spec;

private BaseStatus status;

}
  1. The custom resource definition group
  2. The custom resource definition version
  3. The short name which will be used
$ kubectl get azsbq # will return the ServiceBusQueue

This is where the magic happen. The Java Operator SDK provides a way to generate automatically the custom resource on target/kubernetes/ folder and can be directly used.

Finally let’s build our Resource Controller which will manage our Custom Resource lifecycle:

@Controller // <1>
@Slf4j
public class QueuesController implements ResourceController<ServiceBusQueue> { // <2>
@Inject
@RestClient
QueuesApi api; // <3>
@Inject
OperatorConfiguration configuration; // <4>
@Override
public UpdateControl<ServiceBusQueue> createOrUpdateResource(ServiceBusQueue resource,
Context<ServiceBusQueue> context) { // <5>
try {
SBQueue payload = Utils.convertToParent(resource.getSpec(), SBQueue.class);
api.queuesCreateOrUpdate(resource.getSpec().getResourceGroupName(),
resource.getSpec().getNamespaceName(),
resource.getSpec().getName(), configuration.getSubscriptionId(),
payload); // <6>
return UpdateControl.noUpdate(); // <7>
} catch (ProcessingException | ApiException | IllegalStateException e) {
log.error("Error while creating {} {}", resource.getKind(), resource.getMetadata().getName(), e);
resource.setStatus(new BaseStatus(e.getMessage()));
return UpdateControl.updateCustomResourceAndStatus(resource); // <8>
}
}
/**
* In case of error do not remove finalizer
*/
@Override
public DeleteControl deleteResource(ServiceBusQueue resource, Context<ServiceBusQueue> context) { // <9>
try {
api.queuesDelete(resource.getSpec().getResourceGroupName(),
resource.getSpec().getNamespaceName(),
resource.getSpec().getName(), configuration.getSubscriptionId());
} catch (ProcessingException | ApiException e) {
log.error("Error while deleting {} {}", resource.getKind(), resource.getMetadata().getName(), e);
return DeleteControl.NO_FINALIZER_REMOVAL; // <10>
}
return DeleteControl.DEFAULT_DELETE; // <11>
}
}
  1. The Controller annotation indicates to Quarkus that this class will be used to managed resources
  2. The Controller is typed with the Custom Resource you want to manage
  3. The QueuesApi rest client is injected
  4. The OperatorConfiguration is injected to provide the subscriptionId
  5. This method is invoked whenever an ServiceBusQueue is created or updated
  6. The QueuesApi.queuesCreateOrUpdate is called to create the queue on Azure side
  7. If no error is detected the process is OK. No additional updates are required
  8. In case of error, an error is added in the Custom Resource status
  9. This method is invoked whenever a ServiceBusQueue is deleted
  10. In case of error the finalizer is not removed to let the user investigate the issue
  11. In case the deletion was successful on Azure side the resource is deleted

Test

Let’s now test our Operator.

To deploy it we will use a helm packaging and we will need to install:

  • A Service Account
  • A ClusterRole to work with our custom resources
  • A ClusterRoleBinding to attach the previous ClusterRole to our service account
  • A Rolebinding: To go faster we will make it admin (you must limit the rights you provide to the strict minimum)
  • A Deployment

To access to the helm resources you can go to: https://github.com/speedfl/azure-resource-operator

To deploy simply run:

helm install demo helm/

Let’s now create the different resources:

The ResourceGroup:

apiVersion: org.speedfl.io/v1alpha1
kind: ResourceGroup
metadata:
name: azure-demo-rg
spec:
name: AzureDemoResourceGroup
location: westeurope
tags:
tag1: value1

The ServiceBusNamespace:

apiVersion: org.speedfl.io/v1alpha1
kind: ServiceBusNamespace
metadata:
name: azure-demo-sbns
spec:
resourceGroupName: AzureDemoResourceGroup
name: AzureDemoServiceBusNamespace
location: westeurope
sku:
name: Standard
tier: Standard
capacity: 2

The ServiceBusQueue:

apiVersion: org.speedfl.io/v1alpha1
kind: ServiceBusQueue
metadata:
name: azure-demo-sbq
spec:
resourceGroupName: AzureDemoResourceGroup
namespaceName: AzureDemoServiceBusNamespace
name: AzureDemoServiceBusQueue
properties:
enablePartitioning: true

To deploy them:

$ kubectl create -f examples/resource-group.yml 
resourcegroup.org.speedfl.io/azure-demo-rg created
$ kubectl create -f examples/namespace.yml
servicebusnamespace.org.speedfl.io/azure-demo-sbns created
$ kubectl create -f examples/queue.yml
servicebusqueue.org.speedfl.io/azure-demo-sbq created

Let’s now check on the Azure Portal if the resources exists:

We can see that all our resource has been created. You can note that the tags have been added to the ResourceGroup and that partitioning has been enabled on the Queue.

We can now try to delete the resources:

First the Queue:

$ kubectl create -f examples/queue.yml 
servicebusqueue.org.speedfl.io/azure-demo-sbq created

Then the Namespace:

$ kubectl create -f examples/namespace.yml
servicebusnamespace.org.speedfl.io/azure-demo-sbns created

We can see that Namespace does not exist anymore.

Finally let’s remove the ResourceGroup:

$ kubectl create -f examples/resource-group.yml 
resourcegroup.org.speedfl.io/azure-demo-rg created

You might see that your ResourceGroup does not exist anymore on Azure.

Conclusion

In this article we saw how it is simple to create a Kubernetes Operator using Quarkus to provision Azure Resources. You can now create your own to automate your daily tasks !

In addition Quarkus can be built in native mode reducing the memory/cpu and improving boot time.

However please note the following advice:

  • To reduce the complexity it is recommended to have a unique instance of your Operator deployment to manage all your resources across your cluster (it reduce as well the foodprint to manage you resources).
  • Your operator must be idempotent to avoid any duplicate actions during Operator restart for example (use the status, or better, the remote configuration)
  • Try to protect your resources from drift or at least find a way to monitor/track and recover from such situation.
  • Don’t forget to use Kubernetes Event to track and troubleshoot issues.

Last word: Azure is already working on an operator called Azure Service Operator. Don’t hesitate to make a try!

Links

--

--

Geoffrey

French guy 🥖 with a passion for cloud-native technologies, photography and wine