Extending the OpenTelemetry Collector with your own components
We’ve seen previously that the OpenTelemetry Collector was built with the explicit goal of being easy to be used in downstream distributions, so that you can use only the components you need for your specific case.
Now it’s time to see what’s needed in order to create your own component to extend the collector. Get your Go tooling ready, we’ll need it!
Component types
There are four types of components in the collector:
- Extensions are components which aren’t meant to be used as part of a pipeline. For instance, debugging interfaces are a good fit for this.
- Receivers are components that are responsible for gathering the data from the environment or receiving it from external sources. They are typically HTTP/gRPC endpoints or daemon-like processes.
- Processors are components that have the chance of looking into the data flowing through a pipeline, potentially changing it in the process.
- Exporters are components that have the responsibility of sending the data out of the collector to actual backends, like your tracing, metrics or logging backend.
For this blog post, we’ll implement a simple processor for trace data based on a real requirement: “Split traces from batch”.
Bootstrapping our component
We start by creating a new Go module for our component:
$ go mod init github.com/jpkrohling/splitbatchprocessor
A typical component has the following files:
config.go
, which holds the data structure(s) used in the configuration file.factory.go
, containing a function that builds a new instance of the component.exporter.go
,processor.go
,receiver.go
orextension.go
, containing the code for the actual component.testdata/config.yaml
, with a config file that is used to both show your users how to use the component, as well as to test the parsing of the configuration into the struct fromconfig.go
.metrics.go
, with the definitions for the metrics used by the component.*_test.go
, for the unit tests.
Our processor’s role will be to look into an incoming trace batch and split it into several batches, one for each trace. For the moment, we don’t foresee any configuration option, so, our Config
structure is quite simple:
Each component type has a specific base Settings
type, and the right one for us is the ProcessorSettings
. With that in place, we can create our factory:
Technically, we don’t need to use the processorhelper.NewFactory
function: if our processor implementation satisfies the ProcessorFactory
interface, we can just return a new instance of our factory. But most of the time, the implementations differ only in a couple of aspects: the component name (typeStr
), what’s the default configuration for the component, and the actual processor code. We’ve left out this last part here for now, but our processor already has a valid factory!
Before we create our first test, we need a testdata/config.yaml
, which could look like this:
As we are creating a processor, we don’t really care about the receivers and exporters, so, we can just use no-op ones.
Our tests are very simple, but should already assert that we have a processor that responds to the splitbatch
name in the configuration file:
Once we confirm that our code is working fine so far, we can start with a bare bones implementation of the processor:
Note that we assert on line #11 that our splitBatch
type should always be a component.TraceProcessor
. This is useful, as we’ll get a compilation failure if we use a newer collector version with methods that we don’t have implemented yet.
Similar to what happens with the factory, we could have used processorhelper.NewTraceProcessor
to avoid writing methods that are similar across implementations, like Start
or Shutdown
, but it requires the processor to return the resulting pdata.Traces
, as the helper will call the nextConsumer
on our behalf. In our case, we need to call the nextConsumer
multiple times ourselves, so, we need our own type to satisfy the full interface.
Our processor won’t need to create long-lived resources, nor to perform any cleanup. Therefore, our Start
and Shutdown
functions are empty. It’s worth explaining the GetCapabilities
a bit, though: we are saying here that we are mutating the data that we are receiving. If your processor won’t mutate the incoming data, the collector will use this as a hint to possibly apply some optimizations.
We have now enough to write our main test, asserting how we want our processor to work in general terms. Something like:
Note that we have one batch, with one ResourceSpan
and one InstrumentationLibrarySpan
containing two Span
objects, each belonging to a different trace. We expect the next consumer to have two pdata.Traces
, each with one ResourceSpan
, one InstrumentationLibrarySpan
and one Span
. Note that the instrumentation library’s name is the same in both cases.
With our main requirement written, we can come up with a code that will satisfy the test. Our processor may assume that we’ll have a splitTrace
function that takes a pdata.ResourceSpans
and returns a set of pdata.ResourceSpans
:
And finally, we can create the missing function as part of another source file, split.go
containing our main logic:
Our test can ensure that we have our main requirement covered:
$ go test .
ok github.com/jpkrohling/splitbatchprocessor 0.004s
The only final piece is to tell the factory that we are creating a trace processor. We do that by adding a function that creates our processor, and referring this function in the NewFactory
:
Integration with OpenTelemetry Collector
We have now our processor ready to be used, but we need to integrate it with an OpenTelemetry Collector distribution. From here, we have two options to explore:
- Create a
cmd/main.go
that bootstraps a collector with our custom component - Use the OpenTelemetry Collector Builder to assemble a distribution
The first option is useful for local development, as we can start it from our IDE and setup debugging breakpoints if we need. It’s not very scalable, though: we can’t have one binary for each custom component we use. Our cmd/main.go
is very simple:
We’ll need a minimal configuration file, example.yaml
, to start our collector:
With that in place, we can run it with:
$ go run ./cmd/ --config example.yaml
If all goes well, we’ll see a log statement saying:
Everything is ready. Begin running and processing data.
The second option is more scalable and allows us to use this processor with other custom components we might have. We can get started by having a builder.yaml
like this:
Once we get a binary as the output from the builder, we can start our distribution with this configuration file to test it out:
$ opentelemetry-collector-builder --config builder.yaml
...$ /tmp/otelcol-distribution312862503/otelcol-custom --config example.yaml
Wrapping up
We’ve seen that OpenTelemetry Collector has made it easy to extend its core functionality by exposing hooks where we can plug our own extensions, receivers, processors and exporters. We’ve seen as well that writing a custom component isn’t hard, especially for seasoned Go developers: in simple cases, a couple of factories and your logic wrapped in a function is all it takes. And finally, we’ve seen how to integrate our component with a custom collector distribution, both using a custom main.go
file and using the OpenTelemetry Collector Builder.
We left out a couple of topics, which we leave as an exercise to the reader: making your component more observable by creating and exposing metrics, and the testing infra that is provided by the collector, making it easy to test our components.
The final result for this blog post can be found on this repository: https://github.com/jpkrohling/splitbatchprocessor.
And finally: if you end up needing a processor just like this, you might want to just use the github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpertrace
module 😉