Building Cloud-Native Services with Dapr, Go, and Kubernetes — Part 4

Vladimir Vivien
7 min readJul 24, 2024

Creating event-driven services with Dapr

This series explores how to get started with Dapr to build distributed Go services deployed on Kubernetes.

The source code for the series is hosted at
github.com/vladimirvivien/dapr-examples

In the previous installment of this series (part 3), we looked at how to create and invoke services using the Dapr API. This post, the fourth in our series, we will expand our collection of services by creating an event-based services using the Dapr API and a publish/subscribe component.

The event-driven service

In this updated version of the application, we will add a new publish-subscribe component to delegate processing of the order to a new service called orderprocsvc as illustrated below.

The application will do the following now:

  • The frontendsvc service will receive an order request on its endpoint
  • Next the frontend service will use genidsvc to create an Order ID
  • The initial state of the order is then saved in Redis
  • The order ID is published to a topic for downstream processing
  • Service orderprocsvc receives the order ID for processing from a Redis stream broker
  • After processing, orderprocsvc updates the state of the order in Redis

The full source code and configuration files for this post can be found on GitHub at github.com/vladimirvivien/dapr-examples/03-pubsub

Pre-requisites

This example uses the same setup and resources that were configured in the previous examples. You will need to configure the followings:

  • A Kubernetes cluster (KinD used for this series)
  • Dapr running on Kubernetes
  • Redis deployed on Kubernetes

For detail walkthrough of how to setup your environment for the series, refer to part 1 , part 2 and part 3.

Sending order events for processing

The frontend service needs to be updated so that it delegates order processing to the orderprocsvc service instead of processing it itself. The first code snippet declares variables for configuring the frontend service with pubsub topic and routing information.

var (
appPort = os.Getenv("APP_PORT")
pubsub = os.Getenv("ORDERS_PUBSUB")
topic = os.Getenv("ORDERS_PUBSUB_TOPIC")
...
)

The following snippet highlights the main change in the code for handling incoming orders. The new version of the code uses Dapr’s pubsub API to publish the order ID to the configured topic. Service orderproc will subscribe to that topic so that it can process incoming orders (see above).

func postOrder(w http.ResponseWriter, r *http.Request) {
var receivedOrder types.Order
json.NewDecoder(r.Body).Decode(&receivedOrder)

// generate order ID
out, _ := daprClient.InvokeMethod(r.Context(), genidsvcId, "genid", "post")
orderID := fmt.Sprintf("order-%s", string(out))
receivedOrder.ID = orderID
receivedOrder.Received = true

//marshal order for downstream processing
orderData, _ := json.Marshal(receivedOrder)

// Save state of order
daprClient.SaveState(r.Context(), stateStore, orderID, orderData, nil)

// Publish orderID as events for downstream processing
daprClient.PublishEvent(r.Context(), pubsub, topic, []byte(orderID))
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"order":"%s", "status":"received"}`, orderID)
}

In the snippet above, method daprClient.PublishEvent is used publish new oder IDs as they are created. The application assumes there is a configured component that is subscribed to the topic to handle the orders.

Handling order processing events

Service orderproc is designed to process order events published upstream. Its code uses the Dapr API to create and start a service that subscribes to a pub/sub topic to receive incoming order IDs published by the frontend service.

First, let’s define some variables we will need to inject pub/sub configuration into the service (i.e. pubsub name and topic):

var (
daprClient dapr.Client
appPort = os.Getenv("APP_PORT")
pubsub = os.Getenv("ORDERS_PUBSUB")
topic = os.Getenv("ORDERS_PUBSUB_TOPIC")
)

The next code snippet defines a subscription to the topic with order events. The code also sets up a service and registers the handler that will be triggered when an event arrives on the specified topic.

func main() {
// define subscription
rcvdSub := &common.Subscription{
PubsubName: pubsub,
Topic: topic,
Route: topic,
}
// Create service
s := daprd.NewService(fmt.Sprintf(":%s", appPort))
if err := s.AddTopicEventHandler(rcvdSub, subHandler); err != nil {
log.Fatalf("orderproc: topic subscription: %v", err)
}
}

Next, we’ll define a Dapr API client that will be used to update the order information, in the state store (Redis), once it is processed.

func main(){
...
// Set up Dapr client
dc, err := dapr.NewClient()
if err != nil {
log.Fatalf("order proc: dapr client: %s", err)
}
daprClient = dc
defer daprClient.Close()
}

Lastly, let’s start the service.

func main() {
...
// Start service component last
if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("orderproc: starting: %v", err)
}
}

As mentioned above, the service registers a pub/sub handler to handle incoming events on the specified topic. The following snippet shows the handler.

func subHandler(ctx context.Context, event *common.TopicEvent) (retry bool, err error) {
// extract incoming event data
orderID, _ := event.Data.(string)

// pull and update order
orderItem, err := daprClient.GetState(ctx, stateStore, orderID, nil)
if err != nil {
log.Printf("orders-pubsub: getstate: %s", err)
return true, err
}

// unmarshal order for processing
var order types.Order
if err := json.Unmarshal(orderItem.Value, &order); err != nil {
log.Printf("orders-pubsub: unmarshal: %s: %s", err, orderItem.Value)
return false, err
}

// process order
// Update its state
order.Completed = true

// use daprClient to save order
orderData, err := json.Marshal(order)
if err := daprClient.SaveState(ctx, stateStore, orderID, orderData, nil); err != nil {
log.Printf("orders-pubsub: save state: %s", err)
return true, err
}

return false, nil
}

In the pubsub event handler above, notice the followings are done:

  • The order is pulled from the state store with daprClient.GetState
  • After processing the order is updated with daprClient.SaveState

Building the source code

Next, let’s use ko to compile and build the source code into OCI containers:

ko build --local -B ./frontendsvc
ko build --local -B ./orderprocsvc

Once the images are built, let’s load them to the local kind dapr-cluster:

kind load docker-image ko.local/frontendsvc:latest --name dapr-cluster
kind load docker-image ko.local/orderprocsvc:latest --name dapr-cluster

Deploying the application

This example will use three manifests:

Application Deployment configuration

First, let’s focus on the manifest for new service orderprocsvc as the others are unchanged from before.

kind: Deployment
metadata:
name: orderprocsvc
spec:
...
template:
metadata:
labels:
app: orderprocsvc
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "orderprocsvc"
dapr.io/app-port: "5050"
spec:
containers:
- name: orderprocsvc
image: ko.local/orderprocsvc:latest
ports:
- containerPort: 5050
env:
- name: APP_PORT
value: "6060"
- name: ORDERS_PUBSUB
value: "orders-pubsub"
- name: ORDERS_PUBSUB_TOPIC
value: "received-orders"

As with other Dapr-backed services, the deployment for the orderprocsvc service includes annotations for its Dapr sidecar. The deployment includes annotation dapr.io/app-port: "5050" (matching the container’s containerPort value) to configure the sidecar. The env: section includes application configuration that will be injected into the application at runtime.

Pubsub component configuration

Because Dapr uses a uniform component model, the manifest that configures the Dapr pubsub component looks similar to that of the state store (configured here). The metadata.name attribute identifies the pubsub component so that it can be referenced at runtime. The spec.Type is set to pubsub.redis. to indicate that it is a Redis-backed broker.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: orders-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
auth:
secretStore: kubernetes

Deploying the manifests

The next step deploys the application configuration to the Kubernetes cluster:

kubectl apply -f ./manifest

Now, let’s use the kubectl command to verify the deployment. First, ensure the Dapr components are deployed properly in the cluster:

kubectl get components

NAME AGE
orders-pubsub 20s
orders-store 20s

We also should ensure that the Kubernetes deployments succeeded for each application. We will do that using kubectl to query each application:

$> kubectl get deployments -l app=frontendsvc -o wide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
frontendsvc 1/1 1 1 75m frontendsvc ko.local/frontendsvc:latest app=frontendsvc

$> kubectl get deployments -l app=genidsvc -o wide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
genidsvc 1/1 1 1 46s genidsvc ko.local/genidsvc:latest app=genidsvc

$> kubectl get deployments -l app=orderprocsvc -o wide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
orderprocsvc 1/1 1 1 19m orderprocsvc ko.local/orderprocsvc:latest app=orderprocsvc

Finally, to be thorough and avoid surprises, lets make sure that each application have two containers running in their pods (one for the application and the other for the Dapr sidecar):

$> kubectl get pods 
NAME READY STATUS RESTARTS AGE
frontendsvc-5dc48f44dc-n4xmj 2/2 Running 7 (111s ago) 20m
genidsvc-5645f6d8d-ppfwt 2/2 Running 4 (2m9s ago) 20m
orderprocsvc-5cbbb96688-4vn4v 2/2 Running 8 (81s ago) 20m

Running the application

At this point, the services are ready. To keep things simple, we’ll use Kubernetes port-forward command to expose the container ports of the frontend app:

kubectl port-forward deployment/frontendsvc 8080

Forwarding from 127.0.0.1:8080 -> 8080
Forwarding from [::1]:8080 -> 8080

Next, let’s usethe curl command to post an order to the frontendsvc endpoint:

curl -i -d '{ "items": ["automobile"]}'  -H "Content-type: application/json" "http://localhost:8080/orders/new"HTTP/1.1 200 OK

Content-Type: application/json
Date: Mon, 08 Apr 2024 13:01:31 GMT
Content-Length: 75

The result is a JSON payload showing the status of the order. Note the order status is set to “received”.

{"order":"order-e4e6240e-62a4-496d-9e2f-b20f4f5bab0b", "status":"received"}

Next, we will use curl to query the http://localhost:8080/orders/order/{id} endpoint for the order data.

curl -i  -H "Content-type: application/json" "http://localhost:8080/orders/order/order-1a768924-4d85-4e72-94df-98541f26225e"
HTTP/1.1 200 OK
Content-Type: application/json
Date: Sat, 27 Apr 2024 18:47:11 GMT
Content-Length: 108

The result reflects the updted status of the order after it’s been processed by the orderprocsvc service:

{"ID":"order-1a768924-4d85-4e72-94df-98541f26225e","Items":["automobile"],"Received":true,"Completed":true}

Conclusion

In the fourth part of this series, we walked through the steps of creating an event-based service using the Dapr API. The post also shows how to configure and publish events to a configured broker using the Dapr client API. In future posts, we will continue explore other aspects of Dapr and how you can use it to build distributed applications running on Kubernetes.

References

--

--