Deploy Pydantic K8s Kafka consumers

Sirsh Amarteifio
5 min readSep 23, 2023

--

Lets have some fun with building Kafka consumers from Pydantic types that run on Kubernetes. I have become increasingly dependent on Pydantic to interface parts of my infrastructure and the more I use it the more I like it. Today im going to just create the sandbox project and deploy something on a Pod and then in another post I’ll play a bit more with the deployment pipeline.

Ill use ChatGPT to generate an avro/json schema from a Pydantic one and then show some of the plumbing needed to consume into a Pydantic handler. This acts as a teaser about how we might build platform tooling starting with pydantic types and then generating code to integrate with other systems.

Start with the type! The idea here is to see how much we can generate using templating from only the Pydantic types. We will start simple. We will ask ChatGPT to generate an Avro schema from the Pydantic type and then push that definition to our schema registry. Later we will deploy consumers. Here is a fairly generic parent-child relationship

#some pydantic imports here

class OrderPiecesStatus(BaseModel):
id: str
created_at: str
updated_at: str
code: str
contracts_failed: List[str] = Field(default_factory=list)
node: Optional[str]
observed_at: Optional[str]
make_instance: int


class OrderStatus(BaseModel):
created_at: str
updated_at: str
order_number: str
code: str
number: Optional[str]
sku: str
pieces: List[OrderPiecesStatus] = Field(default_factory=list)

We prompt ChatGPT

Starting from the following Pydantic types, generate the Kafka avro schema definitions {{PydanticTypesHere}}

Dutifully, ChatGPT returns the Avro schema. Now that we have this we can create a Kafka topic.

I use confluent schema registry on Kubernetes. We can post the schema returned from ChatGPT to the schema registry to create or update the kafka topic’s schema. Obviously this requires some consideration for schema evolution. If we trust our Pydantic types and we trust the map from those to the avro schema, we can increasingly rely on an automated map between Pydantic types and kafka topics — maybe using some git action to upsert the schema. After all, why manage two types of schema??

Here is a really bare version of the schema updater to give the idea — with this we can create a topic and set its compatibility to NONE (because we are feeling trusting towards our AI overlords). I’m using localhost because I’m port forwarding to my K8s-based Kafka schema registry.

import requests
def update_topic_schema(fqname, avro_schema, set_compatibility_to="NONE",
registry_url="http://localhost:8001"):
#post it
schema = {"schema": json.dumps(avro_schema)}
response = requests.post(
f"{registry_url}/subjects/{fqname}-value/versions",
json=schema,
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
#check status code here and take actions

if set_compatibility_to:
response = requests.put(
f"{registry_url}/config/{fqname}-value",
json={"compatibility": set_compatibility_to},
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
#check status code
return response

Now that we have the kafka topic, we can use suitable libraries to read and write to kafka. I’m using the Python confluent libraries. I have added a minimal client here.

We will create a library of types — just one to illustrate here. This is unapologetically a monolithic type library in principle. You may disagree, but I’m not in favour of creating completely distinct microservices, with separate types, and utils and code etc. I prefer to share core utils and types and even business logic in one place and then use only my deployment infrastructure (to K8s) to separate my microservices. This is because I believe the real advantage of microservices is not complete code isolation (I think that is a liability and not very DRY) but rather isolation and independence of resources, scaling, versioning (to a point), security and the like i.e. all the stuff Kubernetes is good at!

Once we have our library of type(s) we will create a special build action that manages a zoo of consumers; for each type, there can be a Kafka consumer running on Kubernetes, running some handler function to process the messages. This build pipeline will be the stuff of a later article. For now ill just test the code on one pod.

We will have a single docker image with our library code and we will create a deployment that execs in and runs a particular handler for a particular topic. I created a simple cli and dispatcher in the sample code to illustrate. We can run…

cli kafka consume -t "test.order_status" 

The consumer will loop continuously, consuming from a topic, parsing the messages into the Pydantic type to be handled by some logic.

The deployment entity we would want to use here is the Argo CD Application set with many deployments for each consumer. It will manage a zoo of consumers. For today to keep things simple, lets just use a pod.

The Kafka client and other bits are in this sample code. The approximate folder structure is shown below. The so-called Pydantic Kafka client consumes into a Pydantic type or complains if it cannot. Keep in mind, the real role of Pydantic is not to have yet another way to parse and validate Kafka messages but rather a clearly typed interface between different systems and parts of the architecture.

└── deployment-patterns
├── Dockerfile
└── monolith
├── __init__.py
├── cli.py
├── common
│ ├── KafkaClient.py
│ └── dispatcher.py
└── modules
└── test
├── __init__.py
└── order_status
├── __init__.py
├── schema.avsc
└── schema.py

Now lets build the docker image and deploy. I’ve used environment variables from a config map not shown here (this is loaded in the envFrom part of the pod). After I push the docker image and run the pod I see the consumer waiting for messages and then I send a test message which gets processed below.

watching the consumer pod

Thus ends part I. I wanted to create this basic sample code so I could build out the ApplicationSet deployment stuff later. Since testing Kafka consumers can be a bit annoying, I thought I’d get that out of the way first.

I have become very fond of “type driven development” with Pydantic. What I mean by this is to always start with the Pydantic types and do everything else from that. This is not because I think Python is weak (in practice) for not having types (as some folks claim) but because I have found that it is a very good way to organize code and projects. For example Pydantic validation removes a tonne of otherwise horrible looking defensive code. It allows for a common interface or schema by which to build other integrations or CRUD. It makes the interfaces in your system very clear. It has many advantages.

Next up I want to think more about the deployment side of things.

--

--