Dapr: pubsub under the hood with RabbitMQ

Anvar Nurmatov
5 min readNov 17, 2022

--

In this post we’ll continue playing with Dapr (with .NET 7 app) and we’ll dive into a pubsub component of RabbitMQ implementation (Detailed documentation on the RabbitMQ pubsub component).

In the first post from series we’ve built an app and have tried to push and receive our first message. And here we’ll try to get an understanding of how pubsub component works, we’ll look at some interesting pubsub aspects and consider some use cases. Let’s begin.

Resiliency

So, what’s happening when we added pubsub component and just launched our sidecar (without app running). You can see that sidecar created a connection with one channel to the RabbitMQ node:

One of the advantages from using Dapr is resiliency, eg we can try to forcibly close the established connection or even restart the broker and see it in action (note: a new connection will be established if we subscribed to a topic with this pubsub component, otherwise if our app only publish messages a connection will be established once a new message published):

reconnectWait option instructs a side-car on how long (in seconds) to wait before reconnecting

# pubsub.yaml
- name: reconnectWait
value: 1 # reconnect will be performed in 1 sec after failure

Auto-delete

We can notice that when connection failed or not established there is no queue created. It means when we push a message and there is no consumers our message will be delivered to an exchange but not routed on to a queue since there is no consumers on topic.

For most cases we do not want to lose pushed messages, instead we want to process these messages later. And here deletedWhenUnused option could help us:

# pubsub.yaml
- name: deletedWhenUnused
value: false

And once we ran our application we can see that our queue is not featured with the “AD” flag (auto-delete). And if we even stop the application the queue will persist.

And if we’ll try to push messages and there is no consumers messages will be routed to our queue nevertheless:

And once a consumer is up and running messages will be delivered to it.

CAUTION: Once we set this option to false this is an irreversible change, so if we try to remove this option and change this option to true a connection will fail and couldn’t be established. A side-car will be constantly tryting to reconnect:
“rabbitmq pub/sub: error in subscriber for daprappsampleid-message in ensureSubscription: channel not initialized”
This can be fixed by deleting a queue manually.

Requeues

What if our consumer failed on message processing? By default, dapr component instructs our broker to ignore negative acknowledgments and to do nothing once message delivered. It can easily be simulated by:

app.MapPost("/checkout", [Topic("pubsub","message")] (HelloRecord record) => {
throw new Exception("Something failed");
});

And here requeueInFailure option comes into play. If we set it to true the broker will queue this message again until succeeded.

# pubsub.yaml
- name: requeueInFailure
value: true

There is one another option (Backoff policy) can be useful here, this quote from Dapr documentation:

But in some cases, you may want dapr to retry pushing message with an (exponential or constant) backoff strategy until the message is processed normally or the number of retries is exhausted. This maybe useful when your service breaks down abnormally but the sidecar is not stopped together. Adding backoff policy will retry the message pushing during the service downtime, instead of marking these message as consumed.

Dead-lettering

Also as an option we can have a dead-letter queue and once a message cannot be handled normally the message will be forwarded to this queue, so let’s add a new option:

# pubsub.yaml
- name: enableDeadLetter
value: true

And now we can see that our queue is featured with DLX and a new dead-letter queue created (besides that we can see our failed message in the dead-letter queue):

CAUTION: if a queue doesn’t have an AD (auto-delete) feature it has to be re-created first before enabling a dead-letter feature, otherwise Dapr will fail to establish a connection: “rabbitmq pub/sub error: handling message from topic ‘message’, error from app channel while sending pub/sub event to app: dial tcp4 127.0.0.1:5001: connect: connection refused”

CloudEvents

Dapr gives us a great observability of everything that is happening inside. Would it be a service-to-service invocation or async communication through messaging we can trace all the way from start to finish (thank to OpenTelemetry that Dapr embraces), and it has its price. Each message is wrapped in a CloudEvents envelope. For example, our message looks as follows:

{
"data": {
"id": "4295a101-124c-4873-b0a0-d1510a0c4640",
"message": "Hello Dapr!"
},
"datacontenttype": "application/json",
"id": "b468133e-d5ff-4b70-b74d-068865af971e",
"pubsubname": "pubsub",
"source": "daprappsampleid",
"specversion": "1.0",
"time": "2022-11-17T17:16:33+06:00",
"topic": "message",
"traceid": "00-fb2ca2f94562aef2651c2021d0d2b1e1-9970b8bffd998998-01",
"traceparent": "00-fb2ca2f94562aef2651c2021d0d2b1e1-9970b8bffd998998-01",
"tracestate": "",
"type": "com.dapr.event.sent"
}

Let’s imagine if we want to integrate Dapr gradually service by service, and at some point we’ll have a “hybrid” environment with services running with and without Dapr inside.
Obviously, there will be an issue with messaging between different services. So, rawPayload can solve this problem (of course to the detriment of Dapr’s traceability). rawPayload is a metadata which tells our side-car to omit cloudevents. And our publishing will look like:

app.MapGet("/hello", async () => {
var daprClient = app.Services.GetRequiredService<DaprClient>();
await daprClient.PublishEventAsync("pubsub",
"message",
new HelloRecord(Guid.NewGuid(),"Hello Dapr!"),
new Dictionary<string,string> {
{"rawPayload", "true"}
});
return Results.Ok("Hello Dapr!");
});

And payload:

{"id":"b2119fea-d6af-408c-994b-dd327e16eb09","message":"Hello Dapr!"}

Therefore our message can be consumed by non-Dapr applications without adjusting a model.

But what if we‘d like to consume messages published by a non-dapr application? Our application will fail with:

Microsoft.AspNetCore.Http.BadHttpRequestException: Expected a supported JSON media type but got "".

And it happens because of cloudevents middleware (app.UseCloudEvents()), to solve this problem we can simply remove this middleware from the pipeline and message will succeed.

There are plenty other options that deserve special attention, however, the abovementioned are most important.

Article series about Dapr

References:
- https://dapr.io/

--

--