Playing with Python Kubernetes API: Running Tasks in Jobs Generated by a Pod in Google Kubernetes Engine

Irvi Aini
Google Cloud - Community
5 min readDec 29, 2019

There’s certain use cases where you can use a Job to do one time query then persist in inside some cache for the sake of you application optimization.
That what I thought when I tried to utilize Kubernetes API to perform this one time Job to perform the task.

So why Python? Because this project of mine is related to Analytics which actually more suitable if I’m using Python since it already provided me with many statistical library such as Pandas and Scipy, as well as it also have established library for other library that I might need to be able to interact with other components.

Before jumping straightly to the topic, I’d like to have a brief explanation on Kubernetes API Server, Kubernetes Authentication method, Kubernetes Authorization, and Kubernetes Client.

Introducing API Server

Central management entity and the only component that have direct access to etcd, implemented as RESTful API over HTTP, through which all other components interact…

The Kubernetes API is basically a HTTP API with JSON as its primary serialization schema. It also supports Protocol Buffers (mainly for internal communication within cluster). For extensibility reasons Kubernetes supports multiple API versions at different API paths. Now have a look at how the HTTP API space is constructed. At the top level we distinguish between the core group (/api/v1), the named groups (/apis/$NAME/$VERSION) and system-wide entities (eg./metrics ). Note that version could be v1alpha1 (disabled by default), v2beta3 (enabled by default), and v1 .

In general the Kubernetes API supports create, update, delete, and retrieve operations at the given path via the standard HTTP verbs POST, PUT, DELETE, and GET with JSON as the default payload.

An API Group, a Version, and a Resource (GVR) uniquely defines a HTTP path:

How Job HTTP path looks like.

At first, when HTTP request hits Kubernetes API, the first thing that will be applied is the request will be processed by chain of filters registered within DefaultBuildHandlerChain() (config.go). What will be happened during this period? On high level, either the filter passes and attaches respective infos to ctx.RequestInfo, such as authenticated user or returns an appropriate HTTP response code. Now we’ll try to look deeper at the filters set up inside of DefaultBuildHandlerChain() :

  • WithRequestInfo() as defined in requestinfo.go attaches a RequestInfo to the context
  • WithMaxInFlightLimit() as defined in maxinflight.go limits the number of in-flight requests
  • WithTimeoutForNonLongRunningRequests() as defined in timeout.go times out non-long-running requests like most GET, PUT, POST, DELETE requests in contrast to long-running requests like watches and proxy requests
  • WithPanicRecovery() as defined in wrap.go wraps an handler to recover and log panics
  • WithCORS() as defined in cors.go provides a CORS implementation; CORS stands for Cross-Origin Resource Sharing and is a mechanism that allows JavaScript embedded in a HTML page to make XMLHttpRequests to a domain different from the one the JavaScript originated from.
  • WithAuthentication() as defined in authentication.go tries to authenticate the given request as a user and stores the user info in the provided context. On success, the Authorization HTTP header is removed from the request.
  • WithAudit() as defined in audit.go decorates the handler with audit logging information for all incoming requests The audit log entries contain infos such as source IP of the request, user invoking the operation, and namespace of the request.
  • WithImpersonation() as defined in impersonation.go handles user impersonation, by checking requests that attempt to change the user (similar to sudo).
  • WithAuthorization() as defined in authorization.go passes all authorized requests on to multiplexer which dispatched the request to the right handler, and returns a forbidden error otherwise.

Kubernetes Authentication

In Kubernetes, we have two definition of user:

  • service accounts managed by Kubernetes, typically managed by Kubernetes API, bound to specific namespaces, and created automatically by the API server or manually through API calls (its credentials saved as a Secrets mounted to pods thus allowing in-cluster processes to talk to the Kubernetes API);
  • normal users, typically managed by outside, independent service.

Kubernetes uses client certificates, bearer tokens, an authenticating proxy, or HTTP basic auth to authenticate API requests through authentication plugins. When HTTP requests are made to the API server, plugins attempt to associate the following attributes with the request defined in AuhenticateRequest(req *http.Request) . More information can be read in Authentication. One authentication strategy that we have is using X509 Client Certs. Which is also used as prerequisite of understanding authentication process in GKE.

GKE maintains its own control plane. In a GKE cluster, the control plane components run on GCE instances owned by Google, in a separate Google-managed project. Each instance runs these components for only one customer. Each cluster has its own root certificate authority (CA). An internal service manages root keys for this CA. Each cluster also has its own CA for etcd. Root keys for the etcd CA are distributed to the metadata of the VMs that run the Kubernetes API server. Communication between nodes and the Kubernetes API server is protected by TLS. For more information, see Cluster Trust.

Kubernetes Authorization

Kubernetes authorizes API requests using the API server. It evaluates all of the request attributes against all policies and allows or denies the request. All parts of an API request must be allowed by some policy in order to proceed. The function for this authorization in general mentioned in there. There’s several authorization mode, one of the example is RBAC, or role based authentication model. kubectl provides a mechanism to verify certain API authorization layer using auth can-i command.

Supported Kubernetes Client

There’s many supported client provided both officially-supported or maintained by community, the list can be seen in there.

Client libraries often handle common tasks such as authentication for you. Most client libraries can discover and use the Kubernetes Service Account to authenticate if the API client is running inside the Kubernetes cluster, or can understand the kubeconfig file format to read the credentials as well as the API Server address.

Using Kubernetes Python Client

import os
from kubernetes import client, config, utils
from kubernetes.client.rest import ApiException
class Constants(obj):
NAMESPACE = 'example'
class KubernetesApiClient(obj):
def __init__(self):
# load
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.configuration = client.Configuration()def create_batch_api_client(self):
return client.BatchV1Api(client.ApiClient(self.configuration))
def create_job_object(self, job_name, container_image, args):
volume_name = "" # volume inside of which you put your service account
google_app_credentials_path = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')
volume_mount = client.V1VolumeMount(
mount_path='/'.join(google_app_credentials_path.split('/')[:-1]),
name=volume_name
)
env = client.V1EnvVar(
name='GOOGLE_APPLICATION_CREDENTIALS',
value=google_app_credentials_path
)
container = client.V1Container(
name=job_name,
image=container_image,
args=args,
volume_mounts=[volume_mount],
env=[env],
image_pull_policy="Always")
volume = client.V1Volume(
name=volume_name,
secret=client.V1SecretVolumeSource(secret_name='<secret-where-you-put-the-service-account>')
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "sample"}),
spec=client.V1PodSpec(restart_policy="Never",
containers=[container],
volumes=[volume]))
spec = client.V1JobSpec(
template=template,
backoff_limit=3,
ttl_seconds_after_finished=60)
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=job_name),
spec=spec)
return job

Then you can call your program:

api_client = KubernetesApiClient()job_api_client = api_client.create_batch_api_client()job = api_client.create_job_object(job_name, container_image, args)try: 
api_response = job_api_client.create_namespaced_job(
namespace=Constants.NAMESPACE,
body=job)
print(str(api_response.status))
except ApiException as e:
print(e) # Handle the exception.

Before you deploy this application into the cluster, make sure the service account you’re using already have access to the namespace inside of which you’ll deploy the Kubernetes Job, especially if you’re using RBAC.

Nous espérons qu’il vous sera utile. 👋

--

--

Irvi Aini
Google Cloud - Community

Machine Learning, Natural Language Processing, and Open Source.