Ktor with Websocket and Coroutine Flow

Denny Chen
4 min readDec 25, 2023

--

Using Websocket and flow to achieve data exchange and request control.

Photo by Christopher Robin Ebbinghaus on Unsplash

Follw up on previous article, it was talking about use Keycloak as SSO service. This article will mainly share how to use websocket and kotlin flow to achieve data exchange between two services and request controls.

The basic skeleton is:

  1. B service will push data actively to A service when starts up.
  2. A service will request data if can’t get data from redis.
  3. Using kotlin flow to control request.

Start with Websocket

First, we take B service as websocket client and A service as websocket server. Therefore, we need to install websocket on both services.

To install websocket server on service A

install(WebSockets) {
pingPeriod = Duration.ofSeconds(20)
contentConverter = JacksonWebsocketContentConverter()
}

To install websocket client on service B, I create a httpClient and using DI to get the client.

companion object {
fun create(): HttpClient {
return HttpClient(CIO) {
install(WebSockets) {
pingInterval = 30.seconds.inWholeMilliseconds
contentConverter = JacksonWebsocketContentConverter()
}
}
}
}

Therefore you will have a client something like

class WebScoektClient(
private val client: HttpClient,
private val url:String,
)

Handle the WebSocket on service A

After install websocket on both services, now we can implement the communication logic between two services.

On A service, I create a class calledWebSocketService and inside this class, it will have a private val globalWebSocketSessionReference to store the server session of websocket.

private val globalWebSocketSessionReference = AtomicReference<DefaultWebSocketServerSession?>(null)

You may ask why we use AtomicReference here.

Let me explain this. We want to make sure that we only take one and the latest session if we encounter many requests at the same time. AtomicReferenceensures thread security. At service B. we will use kotlin flow to deal with this.

We have mentioned that service A will receive data when service B starts up or request for data.

Here we’re going to handle this. Hence we create a funtion called webSocketHandler. In this function we need to determine what to do when we receive data.

suspend fun webSocketHandler(session: DefaultWebSocketServerSession) {
globalWebSocketSessionReference.set(session)
while (session.closeReason.,isActive) {
when (val receiveData = session.receiveDeserialized<Payload>()) {
is Payload.typeA -> {}
is Payload.typeB -> {}
}
}
}

As you can see, we have defined a payload and deserialized when we get the session. We can decide different action based on the type.

Before requesting data, we want to make sure we get the only and the latest live session. We create a function called getActiveSession.

private fun getActiveSession(): DefaultWebSocketServerSession? {
return globalWebSocketSessionReference.get()
?.takeIf {
it.isActive
}
}

Therefore we can use above function inside our request logic.

suspend fun requestForTypeA() {
getActiveSession()?.sendSerialized(Payload(typeA))
}

It just that simple after you understand.

Handle websocket on serivce B

We will create a class called webSocketClient. Inside this class, we will use coroutinscope to handle the sessionstate and provide a function to get the lastest session.

class WebSocketClient(
private val client: HttpClient
private val url: String
) : Closeable {

private val coroutineScope = CoroutineScope(EmptyCoroutineContext)

private val sessionState: StateFlow<DefaultClientWebSocketSession?> = flow {
while (currentCoroutineContext().isActive) {
try {
val session = client.webSocketSession(url)
emit(session)
} catch (ex: ConnectException) {
logger.warn("create WebSocket session failure.", ex)
}

if (currentCoroutineContext().isActive) {
delay(reconnectInterval)
}
}
}.stateIn(coroutineScope, SharingStarted.Eagerly, null)paulses

val sessionFlow = sessionState.filterNotNull().filter { it.isActive }
skeleton
suspend fun waitForSession(): DefaultClientWebSocketSession {
return sessionFlow.first()
}

suspend fun DefaultClientWebSocketSession.sendTypeA(payload) {
sendSerialized(Payload.typeA)
}

suspend fun DefaultClientWebSocketSession.sendTypeB(payload) {
sendSerialized(Payload.typeB)
}

}

As you can see, we use stateFlow to control the session and make sure we only get one and the latest session.

In our webSocketService, we also use coroutineScope to control the flow.

In this service, we will handle the flow of sending data.

class WebSocketService(coroutineScope: CoroutineScope) {
private val sendDataFlow = MutableSharedFlow<EventWorkType>()

init {
with(
coroutineScope,
) {
launch {
sendDataFlow
.filter {
TypeA == it
}
.conflate()
.collect {
doSendTypeA()
}
}
launch {
sendDataFlow
.filter {
TypeB == it
}
.conflate()
.collect {
doSendTypeB()
}
}
// you can launch as many tasks as you want, just based on your need.paulses
}
}

suspend fun start() {
WebSocketClient.sessionFlow.collect { session ->
sendDataFlow.emit(TypeA)
sendDataDlow.emit(TypeB)

try {
while (session.isActive) {
when (session.receiveDeserialized<Payload>().type) {
TypeA -> {}
TypeB -> {}
}
}
}
}
}

suspend fun doSendTypeA() {
try {
waitForSession().sendTypeA()
}
}

suspend fun doSendTypeB() {
try {
waitForSession().sendTypeB()
}
}
}

As you can see the service above, when the service starts, it will automatically emit two sessions with type A and B. Also it will determine the type when receiving sessions from service A.

Besides sending data automatically, if service B receive request sessions from service A. it will only process the latest session due to the task flow.

In the flow, we will filter the type first, so we can tell which data needs to be sent. Second use conflate function. The function aims to keep the latest session, it will drop other sessions. Therefore, we can always get the latest one then do the related action.

Conclusion

Above is a brief skeleton of websocket and flow, it still need some detailed implementations such as the design of the payload and what actions needs to be done differ from types. Of course we can use flow on service A to control the session. However given two different ways, we can tell the difference between using flow and AtomicReference.

This is the example of using websocket to exchange data between two ktor services and utilize the coroutine provided by kotlin.

See you in next article. Glad to leave comments or give me applauses.

--

--