Using Tracking processors to replay events in Axon Framework 3

Replaying events is a crucial part in any event sourcing / cqrs application, to rebuild projections, generate new ones or seed external systems with data.

I’m a big fan of the Axon Framework. Even with its quirks and occasional (strange) bugs, it’s my go-to toolbox for my event sourcing & cqrs consulting and development work.

With the recent 3.0 release, Axon changed the way events can be replayed by introducing the Subscribing and Tracking event processors. The Subscribing processor follows the event stream in real-time, whereas the Tracking processor keeps track of events it has processed (using a token). This means that the Tracking processor can be stopped and resumed, and it will pick up processing where it left off.

In a recent project we frequently used both types of processors. To simplify switching from subscribing to tracking mode for (existing) projections, we added two classes: a TrackedProjection annotation and an accompanying Configuration that scans for beans that have the annotation applied.

package org.demo.projections; 
import java.lang.annotation.ElementType; 
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE) 
@Retention(RetentionPolicy.RUNTIME)
public @interface TrackedProjection { }

The configuration takes care of any (Axon) ProcessingGroup annotations (that change the registered name of the processor), and then invokes registerTrackingProcessor on Axon’s EventHandlingConfiguration.

package org.demo.configuration; 
import org.axonframework.config.EventHandlingConfiguration; 
import org.axonframework.config.ProcessingGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.demo.projections.TrackedProjection;
import javax.annotation.PostConstruct;
import java.util.Optional;
@Configuration 
public class ProjectionsConfiguration {
@Autowired private EventHandlingConfiguration eventHandlingConfiguration;
  @PostConstruct public void startTrackingProjections() throws ClassNotFoundException 
{
ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
scanner.addIncludeFilter(new AnnotationTypeFilter(TrackedProjection.class));
for (BeanDefinition bd : scanner.findCandidateComponents("org.demo")) {
Class<?> aClass = Class.forName(bd.getBeanClassName());
ProcessingGroup processingGroup = aClass.getAnnotation(ProcessingGroup.class);
String name = Optional.ofNullable(processingGroup).map(ProcessingGroup::value).orElse(aClass.getPackage().getName());
registerTrackingProcessor(name);
}
}
  private void registerTrackingProcessor(String name) {
eventHandlingConfiguration.registerTrackingProcessor(name);
}
}

These snippets assume Spring Boot (1.5.x) running on Java 8.

This code can be further improved by adding progress tracking for the running processors. These metrics can be exposed, through a REST endpoint, to indicate the time remaining for processors to “catch up” with the live event stream.

Let me know if this code is useful to you!


Originally published at fourscouts.nl on September 18, 2017 by Michiel Rook.