Building Cloud-Native Services with Dapr, Go, and Kubernetes — Part 4
Creating event-driven services with Dapr
This series explores how to get started with Dapr to build distributed Go services deployed on Kubernetes.
- Part 1 — Getting started with Dapr on Kubernetes
- Part 2 — Building stateful applications with Dapr
- Part 3 — Creating and invoking Dapr-enabled services
- Part 4 — Creating event-based services with Dapr
The source code for the series is hosted at
github.com/vladimirvivien/dapr-examples
In the previous installment of this series (part 3), we looked at how to create and invoke services using the Dapr API. This post, the fourth in our series, we will expand our collection of services by creating an event-based services using the Dapr API and a publish/subscribe component.
The event-driven service
In this updated version of the application, we will add a new publish-subscribe component to delegate processing of the order to a new service called orderprocsvc
as illustrated below.
The application will do the following now:
- The
frontendsvc
service will receive an order request on its endpoint - Next the frontend service will use genidsvc to create an Order ID
- The initial state of the order is then saved in Redis
- The order ID is published to a topic for downstream processing
- Service
orderprocsvc
receives the order ID for processing from a Redis stream broker - After processing,
orderprocsvc
updates the state of the order in Redis
The full source code and configuration files for this post can be found on GitHub at github.com/vladimirvivien/dapr-examples/03-pubsub
Pre-requisites
This example uses the same setup and resources that were configured in the previous examples. You will need to configure the followings:
- A Kubernetes cluster (KinD used for this series)
- Dapr running on Kubernetes
- Redis deployed on Kubernetes
For detail walkthrough of how to setup your environment for the series, refer to part 1 , part 2 and part 3.
Sending order events for processing
The frontend service needs to be updated so that it delegates order processing to the orderprocsvc
service instead of processing it itself. The first code snippet declares variables for configuring the frontend service with pubsub topic and routing information.
var (
appPort = os.Getenv("APP_PORT")
pubsub = os.Getenv("ORDERS_PUBSUB")
topic = os.Getenv("ORDERS_PUBSUB_TOPIC")
...
)
The following snippet highlights the main change in the code for handling incoming orders. The new version of the code uses Dapr’s pubsub API to publish the order ID to the configured topic. Service orderproc will subscribe to that topic so that it can process incoming orders (see above).
func postOrder(w http.ResponseWriter, r *http.Request) {
var receivedOrder types.Order
json.NewDecoder(r.Body).Decode(&receivedOrder)
// generate order ID
out, _ := daprClient.InvokeMethod(r.Context(), genidsvcId, "genid", "post")
orderID := fmt.Sprintf("order-%s", string(out))
receivedOrder.ID = orderID
receivedOrder.Received = true
//marshal order for downstream processing
orderData, _ := json.Marshal(receivedOrder)
// Save state of order
daprClient.SaveState(r.Context(), stateStore, orderID, orderData, nil)
// Publish orderID as events for downstream processing
daprClient.PublishEvent(r.Context(), pubsub, topic, []byte(orderID))
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"order":"%s", "status":"received"}`, orderID)
}
In the snippet above, method daprClient.PublishEvent
is used publish new oder IDs as they are created. The application assumes there is a configured component that is subscribed to the topic to handle the orders.
Handling order processing events
Service orderproc
is designed to process order events published upstream. Its code uses the Dapr API to create and start a service that subscribes to a pub/sub topic to receive incoming order IDs published by the frontend
service.
First, let’s define some variables we will need to inject pub/sub configuration into the service (i.e. pubsub name and topic):
var (
daprClient dapr.Client
appPort = os.Getenv("APP_PORT")
pubsub = os.Getenv("ORDERS_PUBSUB")
topic = os.Getenv("ORDERS_PUBSUB_TOPIC")
)
The next code snippet defines a subscription to the topic with order events. The code also sets up a service and registers the handler that will be triggered when an event arrives on the specified topic.
func main() {
// define subscription
rcvdSub := &common.Subscription{
PubsubName: pubsub,
Topic: topic,
Route: topic,
}
// Create service
s := daprd.NewService(fmt.Sprintf(":%s", appPort))
if err := s.AddTopicEventHandler(rcvdSub, subHandler); err != nil {
log.Fatalf("orderproc: topic subscription: %v", err)
}
}
Next, we’ll define a Dapr API client that will be used to update the order information, in the state store (Redis), once it is processed.
func main(){
...
// Set up Dapr client
dc, err := dapr.NewClient()
if err != nil {
log.Fatalf("order proc: dapr client: %s", err)
}
daprClient = dc
defer daprClient.Close()
}
Lastly, let’s start the service.
func main() {
...
// Start service component last
if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("orderproc: starting: %v", err)
}
}
As mentioned above, the service registers a pub/sub handler to handle incoming events on the specified topic. The following snippet shows the handler.
func subHandler(ctx context.Context, event *common.TopicEvent) (retry bool, err error) {
// extract incoming event data
orderID, _ := event.Data.(string)
// pull and update order
orderItem, err := daprClient.GetState(ctx, stateStore, orderID, nil)
if err != nil {
log.Printf("orders-pubsub: getstate: %s", err)
return true, err
}
// unmarshal order for processing
var order types.Order
if err := json.Unmarshal(orderItem.Value, &order); err != nil {
log.Printf("orders-pubsub: unmarshal: %s: %s", err, orderItem.Value)
return false, err
}
// process order
// Update its state
order.Completed = true
// use daprClient to save order
orderData, err := json.Marshal(order)
if err := daprClient.SaveState(ctx, stateStore, orderID, orderData, nil); err != nil {
log.Printf("orders-pubsub: save state: %s", err)
return true, err
}
return false, nil
}
In the pubsub event handler above, notice the followings are done:
- The order is pulled from the state store with
daprClient.GetState
- After processing the order is updated with
daprClient.SaveState
Building the source code
Next, let’s use ko
to compile and build the source code into OCI containers:
ko build --local -B ./frontendsvc
ko build --local -B ./orderprocsvc
Once the images are built, let’s load them to the local kind dapr-cluster
:
kind load docker-image ko.local/frontendsvc:latest --name dapr-cluster
kind load docker-image ko.local/orderprocsvc:latest --name dapr-cluster
Deploying the application
This example will use three manifests:
- A Kubernetes Deployment manifest for
frontendsvc
- A Kubernetes Deployment manifest for
genidsvc
- A Kubernetes Deployment manifest for
orderprocsvc
- A Dapr Component for a Redis data store
- A Dapr Component for a Redis pub/sub
Application Deployment configuration
First, let’s focus on the manifest for new service orderprocsvc as the others are unchanged from before.
kind: Deployment
metadata:
name: orderprocsvc
spec:
...
template:
metadata:
labels:
app: orderprocsvc
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "orderprocsvc"
dapr.io/app-port: "5050"
spec:
containers:
- name: orderprocsvc
image: ko.local/orderprocsvc:latest
ports:
- containerPort: 5050
env:
- name: APP_PORT
value: "6060"
- name: ORDERS_PUBSUB
value: "orders-pubsub"
- name: ORDERS_PUBSUB_TOPIC
value: "received-orders"
As with other Dapr-backed services, the deployment for the orderprocsvc
service includes annotations for its Dapr sidecar. The deployment includes annotation dapr.io/app-port: "5050"
(matching the container’s containerPort
value) to configure the sidecar. The env:
section includes application configuration that will be injected into the application at runtime.
Pubsub component configuration
Because Dapr uses a uniform component model, the manifest that configures the Dapr pubsub component looks similar to that of the state store (configured here). The metadata.name
attribute identifies the pubsub component so that it can be referenced at runtime. The spec.Type
is set to pubsub.redis
. to indicate that it is a Redis-backed broker.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: orders-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
auth:
secretStore: kubernetes
Deploying the manifests
The next step deploys the application configuration to the Kubernetes cluster:
kubectl apply -f ./manifest
Now, let’s use the kubectl
command to verify the deployment. First, ensure the Dapr components are deployed properly in the cluster:
kubectl get components
NAME AGE
orders-pubsub 20s
orders-store 20s
We also should ensure that the Kubernetes deployments succeeded for each application. We will do that using kubectl to query each application:
$> kubectl get deployments -l app=frontendsvc -o wide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
frontendsvc 1/1 1 1 75m frontendsvc ko.local/frontendsvc:latest app=frontendsvc
$> kubectl get deployments -l app=genidsvc -o wide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
genidsvc 1/1 1 1 46s genidsvc ko.local/genidsvc:latest app=genidsvc
$> kubectl get deployments -l app=orderprocsvc -o wide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
orderprocsvc 1/1 1 1 19m orderprocsvc ko.local/orderprocsvc:latest app=orderprocsvc
Finally, to be thorough and avoid surprises, lets make sure that each application have two containers running in their pods (one for the application and the other for the Dapr sidecar):
$> kubectl get pods
NAME READY STATUS RESTARTS AGE
frontendsvc-5dc48f44dc-n4xmj 2/2 Running 7 (111s ago) 20m
genidsvc-5645f6d8d-ppfwt 2/2 Running 4 (2m9s ago) 20m
orderprocsvc-5cbbb96688-4vn4v 2/2 Running 8 (81s ago) 20m
Running the application
At this point, the services are ready. To keep things simple, we’ll use Kubernetes port-forward command to expose the container ports of the frontend
app:
kubectl port-forward deployment/frontendsvc 8080
Forwarding from 127.0.0.1:8080 -> 8080
Forwarding from [::1]:8080 -> 8080
Next, let’s usethe curl
command to post an order to the frontendsvc
endpoint:
curl -i -d '{ "items": ["automobile"]}' -H "Content-type: application/json" "http://localhost:8080/orders/new"HTTP/1.1 200 OK
Content-Type: application/json
Date: Mon, 08 Apr 2024 13:01:31 GMT
Content-Length: 75
The result is a JSON payload showing the status of the order. Note the order status is set to “received”.
{"order":"order-e4e6240e-62a4-496d-9e2f-b20f4f5bab0b", "status":"received"}
Next, we will use curl
to query the http://localhost:8080/orders/order/{id}
endpoint for the order data.
curl -i -H "Content-type: application/json" "http://localhost:8080/orders/order/order-1a768924-4d85-4e72-94df-98541f26225e"
HTTP/1.1 200 OK
Content-Type: application/json
Date: Sat, 27 Apr 2024 18:47:11 GMT
Content-Length: 108
The result reflects the updted status of the order after it’s been processed by the orderprocsvc
service:
{"ID":"order-1a768924-4d85-4e72-94df-98541f26225e","Items":["automobile"],"Received":true,"Completed":true}
Conclusion
In the fourth part of this series, we walked through the steps of creating an event-based service using the Dapr API. The post also shows how to configure and publish events to a configured broker using the Dapr client API. In future posts, we will continue explore other aspects of Dapr and how you can use it to build distributed applications running on Kubernetes.
References
- Blog post series GitHub repo github.com/vladimirvivien/dapr-examples
- The Distributed Application Runtime (Dapr)
- Part 1 — Getting started with Dapr on Kubernetes
- Part 2 — Building stateful applications with Dapr
- Part 3 — Creating and invoking Dapr-enabled services
- Part 4 — Creating event-based services with Dapr