Workflow and implementation of a custom controller.

Senjuti De
17 min readFeb 24, 2023

--

The steps to write a custom Kubernetes controller for a beginner:

  1. Choose a programming language: Kubernetes controllers can be written in any language that can talk to the Kubernetes API. Some popular choices are Go, Python, and Java. In my case it was Go.

2. Set up the development environment: You’ll need to have a development environment set up with the necessary dependencies installed. This includes the Kubernetes API libraries for your chosen programming language, as well as tools like kubectl, minikube or a remote cluster to test on.

3. Familiarize yourself with the Kubernetes API: Before you start writing your custom controller, you need to understand how the Kubernetes API works and what resources it exposes. This will help you write code that interacts with the API correctly.

4. Define your Custom Resource Definition (CRD): Custom controllers in Kubernetes are tied to Custom Resource Definitions (CRDs). These define a custom resource type that your controller will manage. You’ll need to create a CRD for your controller to manage.

5. Write the controller logic: The next step is to write the code for your controller. The logic of a controller typically includes:

i) Watching for changes to the custom resource objects.

ii) Performing actions in response to changes.

iii) Updating the custom resource objects in the Kubernetes API to reflect the current state.

6. Deploy the controller: Once you’ve written the controller logic, you’ll need to deploy it to your Kubernetes cluster.

7. Test your controller: Finally, test your controller to ensure it’s working as expected. You can do this by creating custom resource objects and checking that your controller performs the desired actions in response to changes.

These are the basic steps to write a custom Kubernetes controller. Writing a controller can be complex, so it’s recommended that you have a solid understanding of Kubernetes and programming concepts before attempting to write one.

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Before moving on to the implementation of a controller we need to clearly understand few terms.

  1. Kubeconfig file : The Kubernetes configuration file is a file that contains information about the configuration of a Kubernetes cluster, including the details of how to access the cluster’s API server. It is typically named config and is stored in the ~/.kube/ directory. The configuration file is used by the kubectl command-line tool, as well as by other tools and libraries that interact with the Kubernetes API. It specifies the details of how to connect to a specific cluster, including the API server’s URL, the certificate authority (CA) used to secure communication with the API server, and the authentication credentials to use when connecting to the API. The configuration file is in YAML format and typically contains one or more contexts, each of which defines a different cluster. For example, you might have a single configuration file that includes context for multiple clusters, or you might have a separate configuration file for each cluster. The active context in the configuration file determines which cluster is currently being targeted by kubectl and other tools.
  2. Kubernetes API : The Kubernetes API is the primary means of communicating with a Kubernetes cluster, and is responsible for managing the state of the cluster and the objects within it. The Kubernetes API is a RESTful API, which means that it uses HTTP methods (such as GET, POST, PUT, DELETE, etc.) to manipulate data and communicates using the JSON format.

The Kubernetes API exposes a number of resources, which represent the objects and data that make up a cluster. Some of the most important resources include:

i) Nodes: Represent the individual machines in a cluster and the resources they have available, such as CPU, memory, and storage.

ii) Pods: Represent the smallest deployable units in a cluster and contain one or more containers.

iii) Services: Provide stable, load-balanced network connections to pods in a cluster.

iv) Replication Controllers: Ensure that a specified number of replicas of a pod are running at all times.

v) Deployments: Provide a high-level way to manage and update replicas of pods.

vi) ConfigMaps: Store configuration data for use by pods and other objects in a cluster.

vii) Secrets: Store sensitive data, such as passwords or tokens, for use by pods and other objects in a cluster.

The Kubernetes API is responsible for managing the state of these objects, ensuring that the desired state is maintained in the cluster. For example, if a deployment is created to manage a set of replicas of a pod, the Kubernetes API will ensure that the desired number of replicas are running and healthy. If a node in the cluster fails, the Kubernetes API will take action to ensure that the affected pods are rescheduled onto another node. To communicate with the Kubernetes API, you can use a client library, such as the kubectl command-line tool or a client library in a programming language, such as Go or Python. These clients allow you to interact with the Kubernetes API by making HTTP requests and parsing the responses, allowing you to manage the state of your cluster and the objects within it. In summary, the Kubernetes API is a RESTful API that exposes a number of resources that represent the objects and data in a Kubernetes cluster, and is responsible for managing the state of those objects and ensuring that the desired state is maintained.

3. Kubernetes client and Custom Resource client : The Kubernetes client is a software library that allows you to interact with a Kubernetes cluster by making HTTP requests to the Kubernetes API. The client provides a high-level, programming language-specific interface for managing the state of a cluster and the objects within it.

The custom resource client is a type of Kubernetes client that allows you to interact with custom resources, which are custom objects that you can define and manage in a Kubernetes cluster. Custom resources are used to extend the Kubernetes API with your own custom objects and data, and the custom resource client provides an interface for managing these custom resources. A custom resource client provides a higher-level, object-oriented interface for working with custom resources, allowing you to interact with them in a way that is similar to the built-in Kubernetes resources. For example, you might use a custom resource client to retrieve, create, update, or delete instances of a custom resource, and to watch for changes to those resources.

In summary, the Kubernetes client is a software library that allows you to interact with a Kubernetes cluster, and the custom resource client is a type of Kubernetes client that allows you to interact with custom resources.

4. Work Queue and Informer :

i) Work Queue: A work queue is a simple data structure used to manage and process work items asynchronously. In a Kubernetes custom controller, a work queue is used to manage and process custom resource changes. The custom controller adds resource changes to the work queue as they occur, and a worker goroutine processes the items in the queue, performing the necessary actions to reconcile the custom resource with the desired state. This allows the custom controller to process resource changes in an efficient and scalable manner.

ii) Informer: An informer is a pattern used to cache and retrieve information about resources in a Kubernetes cluster. In a custom controller, an informer is used to cache information about the custom resources that the controller is managing. The informer listens for changes to the custom resources and updates its cache accordingly. This allows the custom controller to efficiently retrieve and process information about the custom resources without having to repeatedly query the API server. An informer works by periodically polling the Kubernetes API server to retrieve a list of resources, and then caching the list in memory. The informer then continues to watch for changes to the resources, and provides notifications to clients when changes occur. The informer also keeps the cached list of resources up-to-date by periodically synchronizing it with the API server.

One of the key benefits of using an informer is that it helps to reduce the load on the API server by reducing the number of API calls. This is because the informer caches the list of resources in memory, and provides notifications to clients when changes occur, rather than requiring clients to periodically poll the API server for updates.

Another benefit of using an informer is that it provides a consistent view of the state of resources. This is because the informer caches the list of resources in memory, and provides notifications to clients when changes occur, rather than requiring clients to retrieve the latest state of the resources from the API server.

Overall, informers are a critical component in Kubernetes custom controllers, providing a way to watch for changes in the state of resources, and providing notifications to clients when changes occur. They help to reduce the load on the API server, and provide a consistent view of the state of resources. In combination, a work queue and an informer allow a Kubernetes custom controller to efficiently manage and process custom resources in a scalable and robust manner.

5. Go routines and Go channels : Go routines and channels are key concepts in the Go programming language. Go routines are lightweight threads of execution that run concurrently with other Go routines. They are designed to be used in a way that is similar to using threads in other languages, but are much lighter and more efficient. Go routines are scheduled by the Go runtime and run in parallel with each other, allowing multiple tasks to be executed simultaneously. Go channels are a mechanism for Go routines to communicate with each other. A channel is a typed conduit through which you can send and receive values between Go routines. Channels can be used to implement inter-routine communication, synchronization, and communication between multiple independent processes.

In the context of Go custom controllers for Kubernetes, Go routines and channels can be used to perform background processing, watch for changes in the state of resources, and manage communication between different parts of the controller. For example, a custom controller might use Go routines to watch for changes in the state of a custom resource, and use channels to communicate with other parts of the controller when changes occur.

6. Defer keyword in Go : In Go, defer is a keyword that is used to schedule a function call to be executed after the surrounding function completes, whether normally or abnormally (such as by a panic). The defer statement is often used for tasks that need to be performed after a function has finished executing. It is often used to clean up resources or perform other tasks that need to be done after the surrounding function has completed, regardless of whether it returns normally or because of a panic.

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Explaining the controller code written in the repository : https://github.com/Senjuti256/customcluster/tree/master/pkg/controller

Here we have defined a new struct named Controller. A field named kubeClient of type kubernetes.Interface. This field will be used to interact with the Kubernetes API. A field named cpodClient of type cClientSet.Interface. This field will be used to interact with custom resources. A field named cpodSync of type cache.InformerSynced. This field indicates whether the informer for the custom resources has successfully synced with the API server. A field named cpodlister of type cLister.CustomclusterLister. This field provides an interface to list the custom resources. A field named wq of type workqueue.RateLimitingInterface. This field is a work queue that stores the work that needs to be processed.

Overall, this Controller struct seems to be defining a controller that interacts with custom resources in Kubernetes. It uses a kubeClient to interact with the Kubernetes API, and a cpodClient to interact with the custom resources. It also has a work queue (wq) to store the work that needs to be processed, and fields (cpodSync and cpodlister) related to the informer for the custom resources.

This code defines a function named NewController that returns a pointer to a Controller object. We have defined a new function named NewController that takes three arguments of types kubernetes.Interface, cClientSet.Interface, and cInformer.CustomclusterInformer, and returns a pointer to a Controller object. Then it logs an informational message using the klog package to indicate that the NewController function has been called. Next it creates a new Controller object and initializes its fields with the provided arguments and other relevant values. The cpodSync and cpodlister fields are initialized with values obtained from the cpodInformer argument, and the wq field is initialized with a new rate-limiting work queue using the workqueue package. Then it logs an informational message using the klog package to indicate that a new Controller object has been created. Logs an informational message using the klog package to indicate that event handlers are being set up. Sets up event handlers for the custom resources using the AddEventHandler method of the informer. The event handlers consist of functions that will be called when the custom resources are added, updated, or deleted. These functions are defined in the Controller object and are named handleAdd and handleDel. Logs an informational message using the klog package to indicate that the Controller object is being returned and returns the Controller object.

This function Run is a method of the Controller struct and takes a channel ch as an input parameter. Here’s a breakdown of what the function does, line by line:

defer c.wq.ShutDown(): This line schedules a call to the ShutDown method of the workqueue.RateLimitingInterface used by the controller when the Run method returns.

klog.Info(“Starting the Customcluster controller\n”): This line logs an informational message indicating that the controller is starting.

klog.Info(“Waiting for informer caches to sync\n”): This line logs an informational message indicating that the controller is waiting for the caches to sync.

if ok := cache.WaitForCacheSync(ch, c.cpodSync); !ok { : This line waits for the cache to sync by calling the WaitForCacheSync function of the cache package. The WaitForCacheSync function blocks until all registered Informers are synced. The function returns true if the cache has synced successfully and false if it fails to sync for some reason. The if statement checks whether the cache has synced successfully or not. If it has not synced, then the function logs a message indicating the failure to sync.

go wait.Until(c.worker, time.Second, ch): This line starts a goroutine that will execute the worker method of the Controller struct periodically. The wait.Until function from the k8s.io/apimachinery/pkg/util/wait package is used to achieve this. The worker method processes the items from the workqueue.RateLimitingInterface used by the controller.

klog.Info(“Started workers\n”): This line logs an informational message indicating that the workers have started.

<-ch: This line blocks until a message is received on the ch channel. The purpose of this line is to keep the Run method running until the channel receives a message. This allows the controller to run continuously until it is interrupted externally.

klog.Info(“Shutting down the worker”): This line logs an informational message indicating that the workers are shutting down.

return nil: This line returns a nil error, indicating that the Run method has completed successfully.

worker is a long-running function that will continually call the processNextItem function in order to read and process an item on the workqueue.

The processNextItem() method is part of the Controller struct and is used as a worker function to process items from the workqueue. Here’s an explanation of each line:

item, shutdown := c.wq.Get(): retrieves an item from the workqueue and a boolean indicating whether the workqueue is shutting down.

defer c.wq.Forget(item): defers a call to the workqueue’s Forget() method, which removes the item from the queue once the function has completed (regardless of whether the function returns normally or with an error).

key, err := cache.MetaNamespaceKeyFunc(item): retrieves the key for the item, which is used to identify the corresponding resource in the API server.

ns, name, err := cache.SplitMetaNamespaceKey(key): splits the key into its namespace and name components.

cpod, err := c.cpodlister.Customclusters(ns).Get(name): retrieves the corresponding resource from the cache (if it exists) using the cpodlister and the namespace and name components of the key.

c1,err:=c.cpodClient.SamplecontrollerV1alpha1().Customclusters(ns).Get(context.TODO(),name,metav1.GetOptions{}): retrieves the corresponding resource from the API server (if it exists) using the cpodClient and the namespace and name components of the key.

labelSelector := metav1.LabelSelector{…}: constructs a LabelSelector object that is used to filter pods by their labels.

listOptions := metav1.ListOptions{…}: constructs a ListOptions object that is used to specify the label selector when listing pods from the API server.

pList, _ := c.kubeClient.CoreV1().Pods(cpod.Namespace).List(context.TODO(), listOptions): lists pods from the API server using the specified ListOptions and stores them in the pList variable.

err := c.syncHandler(cpod, pList): calls the syncHandler function with the cpod and pList arguments to reconcile the current and desired states of the resource.

err = c.waitForPods(cpod, pList): waits for the pods to be in a ready state before proceeding with the next step.

err = c.updateStatus(cpod, cpod.Spec.Message, pList): updates the status of the resource with the current progress and the number of running pods.

return true: indicates that the item was processed successfully and that the worker should continue processing items.

This function takes a pointer to a Customcluster object and returns the total number of running pods for that particular Customcluster. Here’s what each line does:

The function signature specifies that it takes a pointer to a Customcluster object as the input parameter and returns an integer value.

Creates a labelSelector object with the MatchLabels field set to a map containing a single key-value pair. The key is “controller” and the value is the name of the Customcluster object passed as the input parameter.

Sets the MatchExpressions field of the labelSelector object to an empty slice of LabelSelectorRequirement objects. This is done to ensure that only labels specified in the MatchLabels field are used to filter the list of pods.

Creates a ListOptions object with the LabelSelector field set to the string representation of the MatchLabels field of the labelSelector object.

Invokes the List method of the Kubernetes client’s Pods interface with the namespace of the Customcluster object passed as the input parameter and the ListOptions object created earlier. This returns a list of pods that match the label selector.

Initializes a variable runningPods to 0.

Loops over each item in the list of pods returned by the previous step.

Checks if the DeletionTimestamp field of the pod’s metadata is zero, indicating that the pod has not been deleted.

Checks if the Phase field of the pod’s status is set to “Running”, indicating that the pod is currently running.

If both the above conditions are satisfied, increments the runningPods counter by 1.

Returns the runningPods counter.

This function syncs the number of running pods with the desired number specified in a custom resource. The function takes a custom cluster object (cpod) and a list of pods (pList) as input parameters. The cpod object contains the desired number of pods to be running (cpod.Spec.Count) and a message (cpod.Spec.Message). The pList parameter contains a list of pods that are currently running.

The function first checks whether the number of running pods matches the desired number of pods specified in the custom resource. If they do not match, then the function checks if the Message field in the cpod object has been modified since the last sync. If it has been modified, then the function deletes all the running pods and creates new ones according to the updated cpod object.

If the Message field has not been modified, then the function checks whether there are fewer running pods than the desired number. If so, it creates the required number of new pods. If there are more running pods than the desired number, the function deletes the extra pods.

The function also checks if any manually created pods are present and deletes them. Finally, the function returns an error if any of the pod creations or deletions failed, or else it returns nil.

This function creates a new pod object based on the specifications defined in a custom cluster object (cpod). The function takes the cpod object as an input parameter and returns a new corev1.Pod object.

The function first creates a set of labels for the new pod, with the key “controller” and the value set to the name of the custom cluster object (cpod.Name). It then creates a new corev1.Pod object and sets its metadata, including the labels, a unique name based on the custom cluster name and a random number (cpod.Name + “-” + strconv.Itoa(rand.Intn(10000000))), the namespace, and an owner reference to the cpod object.

The function then sets the pod’s specification, including a single container definition that runs the latest version of the nginx image. The container receives an environment variable named “MESSAGE” with a value set to the Message field in the cpod object. The container runs the /bin/sh command and passes it a script that echoes the MESSAGE variable and sleeps for 100 seconds.

Finally, the function returns the new pod object.

This is a function called waitForPods that takes in a Customcluster object and a list of pods. It waits until the number of running pods matches the desired count specified in the Customcluster object.

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute): creates a context with a timeout of 5 minutes and a cancel function to cancel the context.

return poll.Wait(ctx, func(ctx context.Context) (bool, error) { : starts a loop that waits for the condition to be met or for the context to be canceled.

runningPods := c.totalRunningPods(cpod) : gets the number of running pods.

if runningPods == cpod.Spec.Count { : checks if the number of running pods is equal to the desired count.

return true, nil : if the number of running pods is equal to the desired count, returns true to indicate the condition is met and no error.

return false, nil : if the number of running pods is not equal to the desired count, returns false to indicate the condition is not met and no error.

The above code includes three functions:

updateStatus: This function is used to update the status of a custom resource Customcluster. It takes three parameters — cpod, progress, and pList. The function first gets the latest version of the custom resource by calling Get function of the cpodClient. It then calculates the total number of running pods using the totalRunningPods function and updates the Count and Message fields of the custom resource’s status. Finally, it calls the UpdateStatus function of the cpodClient to update the status of the custom resource.

handleAdd: This function is used to handle the addition of a new custom resource instance. It takes one parameter — obj. The function adds the obj to the work queue (wq).

handleDel: This function is used to handle the deletion of a custom resource instance. It takes one parameter — obj. The function signals the work queue (wq) that the obj is done processing.

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Hope I could explain my implementation details of the custom controller :)

--

--

No responses yet