Enhancing Transparency and Accountability: Implementing Entity Audit Logging in Java

Anuj Singh
8 min readApr 26, 2024

--

In the dynamic world of Java development, I embarked on a mission to fulfil a client’s request for comprehensive user activity logs across our company’s services. To achieve this, I turned to Ebean’s changelog feature, a robust tool for audit logging in Java applications.

Initially, I experimented with manual logging by modifying the save method of our model entities. However, this approach proved cumbersome and prone to errors, lacking the comprehensive tracking capabilities we required. Additionally, I explored Ebean’s History annotation, hoping it would streamline the audit logging process. Unfortunately, it didn’t meet our needs as it lacked the granularity and customization options necessary for our application.

Undeterred, I focused on leveraging Ebean’s changelog functionality to seamlessly integrate audit logging into our model entities. Each modification became a part of a detailed activity log, providing insight into users’ actions across our services.

As the development progressed, the application transformed into a testament to our commitment to data integrity. Stakeholders applauded the newfound visibility into user activity, while compliance requirements were effortlessly met. Through the power of Ebean’s Changelog and my dedication to meeting client needs, our Java application emerged as a beacon of transparency, paving the way for a more accountable future in our services.

BASIC ENTITY AUDIT LOGS

To enable audit logging for an entity we need to first enable @ChangeLog annotation from io.ebean.annotation on the entity. By default inserts are included. It can be excluded if required using inserts = ChangeLogInsertMode.EXCLUDEoption in the annotation.

@Entity
@ChangeLog
@Table(name = "datagroup")
public class DataGroup extends Model {
...
@ChangeLog(inserts = ChangeLogInsertMode.EXCLUDE)
@Entity
public class Dataset extends Model {
...

If want to log entity change only on certain field changes, then we can use @ChangeLog(updatesThatInclude = {"field1","field2"})

Then we need to define a log appender for ChangeLogs in logback.xml. In our example we are using a ConsoleAppender to log out to STDOUT, however there are many options to log to socket, Files, DB, Kafka, SMTP or even slack and other apps. Refer this link for various appenders from apache and its not limited to this. There are other libraries offering more options as well.

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{yyyy-MM-dd HH:mm:ss} %coloredLevel %logger{15} - %message%n%xException{10}</pattern>
</encoder>
</appender>
<logger name="io.ebean.ChangeLog" level="TRACE" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

This is sufficient to log entity changes (inserts and updates).

AUDIT LOGS WITH USER CONTEXT

To log user context or additional info along with the changes to the entity, we need to implement ChangeLogPrepare class to add the additional info to the change set. The implementation of ChangeLogPrepare can be automatically detected if classpath scanning is on (just like entity beans are found). That is, if scanning is on we don’t need to explicitly register the ChangeLogPrepare implementation and instead it will be found and instantiated. We can also register it explicitly in DatabaseConfig as used in this doc.

For our use-case, we’ll keep the implementations inside models folder.

class MyChangeLogPrepare implements ChangeLogPrepare {
@Override
public boolean prepare(ChangeSet changes) {
// get user context information typically from a// ThreadLocal or similar mechanism
String currentUserId = ...;
changes.setUserId(currentUserId);
String userIpAddress = ...;
changes.setUserIpAddress(userIpAddress);
changes.setSource("myApplicationName");
// add arbitrary user context information to the// userContext map
changes.getUserContext().put("some", "thing");
return true;
}
}

STORING USER CONTEXT IN THREAD LOCALS

We can set ThreadLocals with user info and use it in change log prepare’s implementation to set it in change set.

ThreadLocalManager

public class ThreadLocalManager {
private static ThreadLocal<Map<String, Object>> context = new ThreadLocal<>();
public static void addToContext(String key, Object value) {
Map<String, Object> currentContext = ThreadLocalManager.context.get() == null ? new HashMap<>() : context.get();
currentContext.put(key, value);
context.set(currentContext);
}
public static void setContext(Map<String, Object> contextMap) {
context.set(contextMap);
}
public static Map<String, Object> getContext() {
return context.get();
}
public static Object getFromContext(String key) {
return Nullifier.get(() -> context.get().getOrDefault(key, "NA"));
}
public static void clearContext() {
ThreadLocalManager.context.remove();
}
}

Set the values in ThreadLocalManager before saving the entity.

ThreadLocalManager.addToContext("authMap", new HashMap<String, String>() {{
put("userName", AsyncRbacAuthUtil.extractUserName(authMap).get());
put("userEmail", AsyncRbacAuthUtil.extractUserEmailAddress(authMap).get());
}});
entity.save()

Custom ChangeLogPrepare

public class AuditLogPrepare implements ChangeLogPrepare {
private final play.Logger.ALogger logger = Logger.of(this.getClass());
@Override
public boolean prepare(ChangeSet changes) {
Map<String, String> authMap = Nullifier.get(() -> (Map<String, String>) ThreadLocalManager.getContext().get("authMap"), new HashMap<>());
if (authMap.isEmpty()) logger.warn("[ALERT] AuthMap is empty for changeset: " + changes.toString());
changes.getUserContext().put("userName", authMap.getOrDefault("userName", DEFAULT_INTERNAL_USER_NAME));
changes.getUserContext().put("userEmail", authMap.getOrDefault("userEmail", DEFAULT_INTERNAL_USER_EMAIL));
changes.setSource("MyApp");
return true;
}
}

For Asynchronous applications, the ThreadLocal values will be lost when the ThreadContext is switched from one thread (pool) to another. To propagate the thread context from one thread to another or one thread-pool to another, we need to create custom Dispatchers created from a custom MessageDispatcherConfigurator (MDC) which uses a custom ThreadPoolExecutor to wrap the method calls in new threads with the context of old threads.

Create a utility class to wrap callables and runnables with thread contexts.

ContextUtility

public class ContextUtility {
public static <T> Callable<T> wrapWithContext(Callable<T> task) {
Map<String, Object> previousContext = ThreadLocalManager.getContext();
if (previousContext == null)
return task;
else
return () -> {
ThreadLocalManager.setContext(previousContext);
try {
return task.call();
} finally {
ThreadLocalManager.clearContext();
}
};
}
public static Runnable wrapWithContext(Runnable task) {
Map<String, Object> previousContext = ThreadLocalManager.getContext();
if (previousContext == null) {
return task;
} else
return () -> {
ThreadLocalManager.setContext(previousContext);
try {
task.run();
} finally {
ThreadLocalManager.clearContext();
}
};
}
}

Using the methods from ContextUtility we create CustomThreadPoolExecutor to override methods to attach thread context before submitting/executing tasks

CustomThreadPoolExecutor

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory,
@NotNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public <T> @NotNull Future<T> submit(@NotNull Callable<T> task) {
return super.submit(ContextUtility.wrapWithContext(task));
}
@Override
public <T> @NotNull Future<T> submit(@NotNull Runnable task, T result) {
return super.submit(ContextUtility.wrapWithContext(task), result);
}
@Override
public @NotNull Future<?> submit(@NotNull Runnable task) {
return super.submit(ContextUtility.wrapWithContext(task));
}
@Override
public void execute(@NotNull Runnable task) {
super.execute(ContextUtility.wrapWithContext(task));
}

Now, we will use this executor in our custom MDC to allow creating custom dispatchers.

CustomDispatcherConfigurator

public class CustomDispatcherConfigurator extends MessageDispatcherConfigurator {
private final CustomDispatcher instance;
public CustomDispatcherConfigurator(Config config, DispatcherPrerequisites prerequisites) {
super(config, prerequisites);
Config threadPoolConfig = config.getConfig("thread-pool-executor");
int fixedPoolSize = threadPoolConfig.getInt("fixed-pool-size");
instance = new CustomDispatcher(
this,
config.getString("id"),
config.getInt("throughput"),
Duration.create(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
(id, threadFactory) -> () -> new CustomThreadPoolExecutor(fixedPoolSize,
fixedPoolSize,
threadPoolConfig.getDuration("keep-alive-time", TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(),
new ThreadFactory() {
private int threadId = 1;
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName(config.getString("name") + "-" + threadId++);
return thread;
}
}),
Duration.create(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
);
}
@Override
public MessageDispatcher dispatcher() {
return instance;
}
}
class CustomDispatcher extends Dispatcher {
public CustomDispatcher(MessageDispatcherConfigurator _configurator,
String id,
int throughput,
Duration throughputDeadlineTime,
ExecutorServiceFactoryProvider executorServiceFactoryProvider,
scala.concurrent.duration.FiniteDuration shutdownTimeout) {
super(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout);
}
}

Then, we can create Dispatchers using the custom MDC and define their config in aplication.conf

application.conf

db-io-dispatcher {
type = "contexts.CustomDispatcherConfigurator"
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 11
}
throughput = 1
shutdown-timeout = 60s
}
web-io-dispatcher {
type = "contexts.CustomDispatcherConfigurator"
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 20
}
throughput = 1
shutdown-timeout = 60s
}

DatabaseIODispatcher

public class DatabaseIODispatcher extends CustomExecutionContext {
@Inject
public DatabaseIODispatcher(ActorSystem actorSystem) {
super(actorSystem, "db-io-dispatcher");
}
}

This allows us to switch context from one executor to another without losing thread’s context.

Managing Context with RMQ

For RMQ messages, we are using message headers to populate thread context.

From RMQ publisher, we publish message with user context from current thread’s ThreadLocal in the message’s header.

RmqPublisher

public class RmqPublisher extends AbstractRMQComponent {
private final Map<String, Connection> connections;
@Inject
public RmqPublisher() {
this.connections = new HashMap<>();
}
public void publishMessage(String uri, String exchange, String message, String routingKey) throws IOException, TimeoutException {
try {
Connection connection = connections.containsKey(uri) ? connections.get(uri) : getTcpConnection(uri);
connections.put(uri, connection);
Channel channel = connection.createChannel();
Map<String, Object> headers = new HashMap<>();
setHeadersForAuditLog(headers);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.headers(headers)
.build();
channel.basicPublish(exchange, routingKey, properties, message.getBytes());
channel.close();
} catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
throw new IOException(e);
}
}
public void publishMessage(String uri,
String exchange,
String message,
Map<String, Object> headers,
String routingKey) throws IOException, TimeoutException {
try {
Connection connection = connections.containsKey(uri) ? connections.get(uri) : getTcpConnection(uri);
connections.put(uri, connection);
Channel channel = connection.createChannel();
setHeadersForAuditLog(headers);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.headers(headers)
.build();
channel.basicPublish(exchange, routingKey, properties, message.getBytes());
channel.close();
} catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
throw new IOException(e);
}
}
private void setHeadersForAuditLog(Map<String, Object> headers) {
Map<String, Object> threadContext = Nullifier.get(ThreadLocalManager::getContext, new HashMap<>());
if (!threadContext.isEmpty()) {
headers.put("authMap", threadContext.getOrDefault("authMap", DEFAULT_AUTH_MAP));
headers.put("orgId", String.valueOf(threadContext.get("orgId")));
headers.put("orgIdFromHeader", String.valueOf(threadContext.get("orgIdFromHeader")));
} else {
headers.put("authMap", DEFAULT_AUTH_MAP);
}
}
}

And in the Consumer, we fetch the user context from header and set it to the thread context

AbstractConsumer

public abstract class AbstractConsumer<T> extends AbstractRMQComponent {
...
// in the processMessage method, set context before handling consumer task.
public void processMessage(Channel channel, Envelope envelope, AMQP.BasicProperties properties, String message, RMQueue queue) {
setThreadContextForAuditLog(properties.getHeaders());
handleConsumerTask(requestMsg, properties.getHeaders());
channel.basicAck(envelope.getDeliveryTag(), true);
}
private void setThreadContextForAuditLog(Map<String, Object> headers) {
Map<String, String> authMap = (Map<String, String>) headers.getOrDefault("authMap", DEFAULT_AUTH_MAP);
ThreadLocalManager.addToContext("authMap", authMap);
}

GCP PubSub Logger Appender

Create a logger appender class that publishes to pubsub.

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.slf4j.MDC;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

@EqualsAndHashCode(callSuper = true)
@Data
public class GoogleCloudPubsubAppender extends AppenderBase<ILoggingEvent> {
private String projectId;
private String topicId;
private Publisher publisher;
@Override
public void start() {
super.start();
try {
publisher = Publisher.newBuilder(ProjectTopicName.of(projectId, topicId)).build();
} catch (IOException e) {
e.printStackTrace();
addError("Failed to create publisher: " + e.getMessage());
stop();
}
}
@Override
protected void append(ILoggingEvent eventObject) {
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(eventObject.getFormattedMessage()))
.putAllAttributes(eventObject.getMDCPropertyMap())
.build();
MDC.clear();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
@Override
public void onSuccess(String messageId) {
// Message published successfully
}
@Override
public void onFailure(Throwable t) {
addError("Failed to publish message: " + t.getMessage());
}
}, Runnable::run);
}
@Override
public void stop() {
super.stop();
if (publisher != null) {
try {
publisher.awaitTermination(10, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

Using this appender class, define a appender in the logback.xml and provide the project and topic id of the pusbsub to where the logs are to be published.

<variable name="LOGGER_TOPIC_ID" value="${AUDIT_LOG_PUBSUB_TOPIC:-audit-logs-topic}"/>
<variable name="LOGGER_PROJECT_ID" value="${PROJECT_ID:-dcr-orchestration-qa}"/>
<appender name="PUBSUB_LOGGER" class="com.zeotap.cdp.logging.GoogleCloudPubsubAppender">
<projectId>${LOGGER_PROJECT_ID}</projectId>
<topicId>${LOGGER_TOPIC_ID}</topicId>
</appender>
<appender name="ASYNC_PUBSUB_LOGGER" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="PUBSUB_LOGGER"/>
</appender>
<logger name="io.ebean.ChangeLog" level="TRACE" additivity="false">
<appender-ref ref="ASYNC_PUBSUB_LOGGER"/>
</logger>

To add additional info in the pubsub message containing logs, use logback’s MDC to put additional info into MDC map and later to be used in appender.

Points to Remember

  1. In async implementations, if the thread is switched in-between from custom executors to default ThreadPoolExecutor or ForkJoinPool, the thread context will get lost, hence we need to ensure that the thread context is not getting lost if any library method is using default pools.
  2. We need to clear thread context after the task is complete or else it can cause memory leaks or OOM issues.

--

--