Implementing Camunda Custom Job Handler

As described in my previous post, one of the implementation strategies for Batch Processing is the creation of a custom job in Camunda.

For doing so, a custom job handler needs to be implemented which is based on job handler configuration and has a type. In order to start a job, a message of that type and containing the configuration of the job instance needs to be passed to the internal Camunda API using the command pattern. The following diagram depicts the relationships between the concepts.

Concept map Camunda Job Handler

The configuration of the job instance supplied by the message is serialized to a Java String during the start of the job and de-serialized back by the Job Executor and passed to the Job Handler later. In doing so, the parameter of a job instance are transmitted to the Job Handler. There are many ways how to serialize a Java object to String and back, but I prefer to encode is at JSON since Jackson is already available on the classpath if you use SpringBoot and Camunda. To make things easier, you can introduce a helper class for dealing with serialization. Here is a possible implementation using the ObjectMapper passed from the outside:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
class JacksonJobHandlerConfigurationSerializer<T extends JacksonJobHandlerConfiguration> {
  private final ObjectMapper mapper;
private final Class<T> clazz;
  JacksonJobHandlerConfigurationSerializer(final ObjectMapper mapper, final Class<T> clazz) {
this.mapper = mapper;
this.clazz = clazz;
}
  String toCanonicalString(final T instance) {
try {
return mapper.writeValueAsString(instance);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error during serialization if job handler configuration of type " + clazz.getName(), e);
}
}
  T fromCanonicalString(final String canonicalString) {
try {
return mapper.readValue(canonicalString, clazz);
} catch (IOException e) {
throw new IllegalArgumentException("Error during de-serialization of " + canonicalString + " to job handler configuration of type " + clazz.getName(), e);
}
}
}

The corresponding job handler configuration looks as following:

import com.fasterxml.jackson.databind.ObjectMapper;
import net.minidev.json.annotate.JsonIgnore;
import org.camunda.bpm.engine.impl.jobexecutor.JobHandlerConfiguration;
/**
* Base class for any job handler configuration which should be subclassed in order to get serialized.
*/
public abstract class JacksonJobHandlerConfiguration implements JobHandlerConfiguration {
  @JsonIgnore
private final JacksonJobHandlerConfigurationSerializer<JacksonJobHandlerConfiguration> serializer;
  protected JacksonJobHandlerConfiguration(final ObjectMapper mapper) {
this.serializer = new JacksonJobHandlerConfigurationSerializer<>(mapper, JacksonJobHandlerConfiguration.class);
}
  public String toCanonicalString() {
return serializer.toCanonicalString(this);
}
}

Implementation of our own configuration is as simple as subclassing this class and adding fields for the payload. In our notification example, the configuration consists of three fields: recipient, subject and body of the message:

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.holunda.spike.camunda.job.JacksonJobHandlerConfiguration;
public class NotificationJobHandlerConfiguration extends JacksonJobHandlerConfiguration {
  private String recipient;
private String subject;
private String body;
  public NotificationJobHandlerConfiguration(final ObjectMapper mapper, final String recipient, final String subject, final String body) {
super(mapper);
this.body = body;
this.subject = subject;
this.recipient = recipient;
}
}

In order to work with Jackson, you have to add annotated getter/setter to the class and provide a factory method:

@JsonCreator
@JsonInclude(JsonInclude.Include.NON_EMPTY)
static NotificationJobHandlerConfiguration fromJson(
@JsonProperty("recipient") final String recipient,
@JsonProperty("subject") final String subject,
@JsonProperty("body") final String body) {
return new NotificationJobHandlerConfiguration(new ObjectMapper(), recipient, subject, body);
}
  @JsonProperty("recipient")
String getRecipient() {
return recipient;
}
  // further getter and setter...

After implementation of the configuration, the Job Handler can be implemented. In order to simplify the implementation, you could create a helping adapter which utilizes the serializer, similar to the Job Handler Configuration:

import com.fasterxml.jackson.databind.ObjectMapper;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.jobexecutor.JobHandler;
import org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity;
import org.camunda.bpm.engine.impl.persistence.entity.JobEntity;
public abstract class JacksonJobHandlerAdapter<T extends JacksonJobHandlerConfiguration> implements JobHandler<T> {
  private final JacksonJobHandlerConfigurationSerializer<T> serializer;
  protected JacksonJobHandlerAdapter(final ObjectMapper mapper, final Class<T> clazz) {
this.serializer = new JacksonJobHandlerConfigurationSerializer<>(mapper, clazz);
}
  @Override
public String getType() {
return getClass().getName();
}
  @Override
public void execute(final T configuration, final ExecutionEntity execution, final CommandContext commandContext, final String tenantId) {
// no implementation
}
  @Override
public T newConfiguration(final String canonicalString) {
return this.serializer.fromCanonicalString(canonicalString);
}
  @Override
public void onDelete(final T configuration, final JobEntity jobEntity) {
// empty implementation
}
}

A concrete handler is then a simple Spring component, sub-classing the adapter and has to overwrite the execute method. Just inject the transmission service and delegate sending notification to it:

@Component
public class NotificationJobHandler extends JacksonJobHandlerAdapter<NotificationJobHandlerConfiguration> {
  private final TransmissionService service;
  public NotificationJobHandler(ObjectMapper mapper, TransmissionService service) {
super(mapper, NotificationJobHandlerConfiguration.class);
this.service = service;
}
  @Override
public void execute(NotificationJobHandlerConfiguration configuration,
ExecutionEntity execution,
CommandContext commandContext,
String tenantId) {
this.service.transmitNotification(configuration.getRecipient(), configuration.getSubject(), configuration.getBody());
}
  @Override
public String getType() {
return SendNotificationCommand.TYPE;
}
}

The implementation of the handler is ready. Now, you need a way to create Jobs. For doing so, let create a Command, encapsulating the Message:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.holunda.spike.example.notification.handler.NotificationJobHandlerConfiguration;
import org.camunda.bpm.engine.impl.interceptor.Command;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.persistence.entity.MessageEntity;
public class SendNotificationCommand implements Command<String> {
  public static final String TYPE = "SendNotification";
private final NotificationJobHandlerConfiguration payload;
  public SendNotificationCommand(final ObjectMapper mapper, final String recipient, final String subject, final String body) {
this.payload = new NotificationJobHandlerConfiguration(mapper, recipient, subject, body);
}
  @Override
public String execute(final CommandContext commandContext) {
    final MessageEntity entity = new MessageEntity();
    entity.init(commandContext);
entity.setJobHandlerType(TYPE);
entity.setJobHandlerConfiguration(payload);
    commandContext.getJobManager().send(entity);
    return entity.getId();
}
}

In order to send a command, a Command Executor is required. This can be obtained from ProcessEngineConfigurationImpl instance which can be injected:

import com.fasterxml.jackson.databind.ObjectMapper;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.interceptor.CommandExecutor;
import org.springframework.stereotype.Service;
@Service
public class NotificationService {
  private final ObjectMapper mapper;
private final ProcessEngineConfigurationImpl config;
  public NotificationService(final ObjectMapper mapper, final ProcessEngineConfigurationImpl config) {
this.mapper = mapper;
this.config = config;
}
  public void sendNotificationInaAJob(String recipient, String subject, String body) {
final CommandExecutor executor = config.getCommandExecutorTxRequired();
executor.execute(new SendNotificationCommand(mapper, recipient, subject, body));
}
}

If you have the helper and adapter classes in place, the creation of a custom job is as easy as sub-classing a Job Handler Configuration and Job Handler. In addition, to start the Job, a Command needs to be sent to the Command Executor. As a result, Camunda persists the job in their database (actually in ACT_RU_JOB) table. It is accessible via Job API and can be manipulated as every other Camunda Job (like Timer or Asynchronous Continuation). The retry strategies and number of retries or job priority applies to it as to the remaining Camunda Jobs. If the job fails and the number of retries runs to zero, an incident is created inside of Camunda and persisted in ACT_RU_INCIDENT database table.