Creating User Session for Kafka Consumer

Priyanka
Deskera Engineering
3 min readMay 18, 2020

Managing External Requests

Photo by Markus Spiske on Unsplash

Creating a bean for HttpServletRequest using the spring application context can be done quite easily but getting that bean using external requests like Kafka consumer is not that straight forward. If you try to access the bean from the external request you will be getting an error: java.lang.IllegalStateException: No thread-bound request found.

When the request is coming from the external environment it's required to have a context that can provide the instance of that bean to the external request. This can be done by using the following steps:

  1. Providing a RequestContextHolder and Setting its Request Attribute:

RequestContextHolder is a spring API for setting the values to any of three scopes: Request, Session, or Global Session. In some situations, where we may not be able to get the actual request bean from the external request, we need to use RequestContextHolder to get the request attributes and set the values. We have to explicitly pass the scope id as the parameter in it.

public void setUserSession(final String token) {
try {

RequestContextHolder.setRequestAttributes(new KafaRequestAttribute());
final UserSession user = applicationCtx.getBean(UserSession.class);
} catch (final IOException e) {
log.error("Exception occurred while decoding the token.", e);
throw new UnauthorizedException(UNAUTHORIZED_USER_ERR);
}
}

2. Defining RequestAttributes for User Session with Request Scope:

We need to restrict the functionality of Request Attribute and set its scope to request. With the request bean definition, the Spring container will create a brand new instance of the UserSession bean for each and every kafkaSession request. That is, the ‘userSession’ bean will be effectively scoped at the HTTP request level. You can change the internal state of the instance that is created. Other instances created with the same ‘userSession’ bean definition will not be able to see these changes in the state since these are particular to an individual request. When the request is finished processing, these request scoped bean will be discarded. The request scope is based on a thread-local attribute in RequestContextHolder initialized for every request by DispatcherServlet by default. Your HttpServletRequest is a dynamic JDK proxy that checks the thread-local attribute and gets the real request object and calls it upon every method call. The resolvable request dependency itself is registered as a bean as part of the WebApplicationContext.

public class KafaRequestAttribute implements RequestAttributes {

private final Map<String, Object> requestAttributeMap = new HashMap();
public KafaRequestAttribute() {
}
public Object getAttribute(String name, int scope) {
return scope == 0 ? this.requestAttributeMap.get(name) : null;
}
public void setAttribute(String name, Object value, int scope) {
if (scope == 0) {
this.requestAttributeMap.put(name, value);
} else {
throw new ApplicationException(String.format("Scope of external request should be : Request."));
}
}
public void removeAttribute(String name, int scope) {
if (scope == 0) {
this.requestAttributeMap.remove(name);
} else {
throw new ApplicationException(String.format("Scope of external request should be : Request."));
}
}
public String[] getAttributeNames(int scope) {
return scope == 0 ? (String[])this.requestAttributeMap.keySet().toArray(new String[0]) : new String[0];
}
public void registerDestructionCallback(String name, Runnable callback, int scope) {
throw new ApplicationException(String.format("Operation not supported."));
}
public Object resolveReference(String key) {
throw new ApplicationException(String.format("Operation not supported."));
}
public String getSessionId() {
return null;
}
public Object getSessionMutex() {
return null;
}
}

3. Creating @KafkaSession Custom Annotation:

We now need to create a custom annotation named as @KafkaSession using Spring AOP and define a joint point for its execution

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface KafkaSession {
}
@Aspect
@Component
@Log4j2
public class KafkaSessionAnnotationProcessor {
@Autowired
private ExternalSessionProvider externalSessionProvider;
@Before("@annotation(com.util.annotation.KafkaSession)")
public void evaluate(final JoinPoint joinPoint) {
final MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
final Object[] args = joinPoint.getArgs();
final Method method = methodSignature.getMethod();
final Annotation[][] parameterAnnotations = method.getParameterAnnotations();
// Business implementation
}
}

4. Using Custom Annotation for Kafka Listener:

Now we have Custom Annotation available that can be used to create a session for all external requests coming from Kafka Consumer. We just need to simply add an annotation: @KafkaSession

@KafkaSession
@KafkaListener(topics = kafkaTopics", containerFactory = "kafkaListenerContainerFactory", groupId = “kafkaGroupIds")
public void kafkaConsumer(@Payload final KafkaMessage message,
final @Header(HEADER.X_ACCESS_TOKEN) String token) {
// Business implementation
}

Finally, our annotation is ready to handle everything. We just need to simply add @KafkaSession for the KafkaListener.

--

--