Adding custom queues in Datadog’s Data Streams Monitoring in dotnet

Raphaël Vandon
4 min readDec 13, 2024

--

Data Stream Monitoring (DSM for short) is a Datadog product that gives visibility on the flow of data through pipelines. It supports the most commonly used queues natively, like Kafka, RabbitMQ or Amazon SQS, but it can also be manually set up to track any data streaming system. In this article, I’m going to explain how to do this when writing dotnet apps.

Intro: my custom queuing system

For the purpose of this exercise, I’m going to use a very basic queue-like system that uses a file to pass data. The goal is just to represent a custom queue that’d be unknown to Datadog.

I’m giving it the ability to have headers and a body:

class Message
{
public readonly List<string> Headers = new();
public string Body = string.Empty;

...
}

and the message can be sent to a file, or read from there:

public void Send(string dest) => File.AppendAllText(dest, this.ToString() + Environment.NewLine);

public static Message Receive(string dest)
{
var payload = File.ReadLines(dest).Last().Split(BodySeparator);
var msg = new Message { Body = payload[1] };
msg.Headers.AddRange(payload[0].Split(HeadersSeparator));
return msg;
}

I’m just providing this just for context for what follows.

Manually instrumenting the Send operation

We’re going to do 2 things here: first, we’re going to create a span around the operation, and then we’re going to inject this span’s info into the message.

Creating the span is fairly straightforward, after adding a reference to Datadog.Trace, we can just surround what we want with using (var scope = Tracer.Instance.StartActive(“Sending”))

Then we want to inject the span info into the message. This is done by using a SpanContextInjector. It provides 2 methods, one to just inject the trace info to get context propagation (i.e. have the receiving span in the same trace as the sending span), and the other to also send data to DSM. We provide it with a callback to set key/values. Here it is, with the span creation:

using (var scope = Tracer.Instance.StartActive("Sending"))
{
var message = new Message { Body = $"Hello #{new Random().Next(1000)}" };
message.Headers.Add("sent by me"); // Just a sample header

new SpanContextInjector().InjectIncludingDsm(
message.Headers, // can be anything that's being sent
SetHeader,
scope.Span.Context,
messageType: "FileQueue", // The name of the queue technology
target: queueFile); // The topic, or equivalent

message.Send(queueFile);
}

static void SetHeader(List<string> headers, string key, string value)
{
//!\ don't use = as a separator because DSM writes in base64, and so it can write an '='
headers.Add($"{key}{KeyValueSeparator}{value}");
}

Here, I chose to represent the headers separately in my message, but in a queuing technology without headers, it’d be totally possible to pass the body instead, as long as we can write to it with the callback, and most importantly retrieve the info we wrote later on (we’ll see that in the next paragraph).

Manually instrumenting the Receive operation

Here, we need to do roughly the same thing, but it’s a bit trickier. 2 things as well, creating the span of course, and extracting the relevant info from the headers.

Why is it tricker ? Because when we receive the message, we cannot know the parent span we’re going to use before actually having received the message, so we cannot just do a StartActive around the code like we did for sending. The trick we’re going to use here is that we’ll capture the time when we start the operation, and set it as the span starting time later on, when we have the necessary info to create it.

Reading the headers is done with a SpanContextExtractor. It’s only a tiny bit more complex than setting them because the values are expected as an IEnumerable (because uniqueness of the keys is not guaranteed), but we handle that easily with yield. Here is the code for all that:

var startTime = DateTimeOffset.UtcNow;

var message = Message.Receive(queueFile);

var parentContext = new SpanContextExtractor().ExtractIncludingDsm(
message.Headers,
GetHeader,
messageType: "FileQueue",
source: queueFile);

// Now we can create the span from the extracted context
using var scope = Tracer.Instance.StartActive(
"Receiving",
new SpanCreationSettings
{
Parent = parentContext,
StartTime = startTime // Starting the span in the past
});
// Be sure to close the scope if necessary

static IEnumerable<string?> GetHeader(List<string> headers, string key)
{
var header = headers.FirstOrDefault(h => h.StartsWith(key));
// TODO handle missing key
yield return header.Split(KeyValueSeparator)[1];
}

Notice that we don’t have anything special to do to handle DSM, the method is taking care of everything using the information it gets from the headers.

Running the code

Now we just need to setup the right environment variables for regular Datadog dotnet instrumentation, as described in the doc. We don’t forget to enable DSM of course, with DD_DATA_STREAMS_ENABLED=true, and as usual the tracer needs a local Datadog agent to talk to (I’m running one on my laptop, which makes it really easy to run sample code like this).

We should be able to see the spans being created in APM (clicking on one shows the “sending” and “receiving” spans in the same trace):

And if we go to the Data Streams Monitoring page, we can see our service and the queue being used (it looks a bit silly here because my service is both the sender and the receiver):

And that’s it !

--

--

No responses yet