Reactive DynamoDB

bschandramohan
TechieConnect
Published in
3 min readFeb 1, 2021

Enhanced Reactive Dynamodb library was released in 2020 and you are missing out if you aren’t using that for communicating with Dynamodb.

AWS, CC BY-SA 4.0 <https://creativecommons.org/licenses/by-sa/4.0>, via Wikimedia Commons

This is a follow up to my blog on basic interaction with Dynamodb —

We now have reactive APIs for dynamodb. The enhanced library from dynamodb is the better one to use now and this article dwells deeper into how to use that. Github project that we will be referencing throughout:

New “Enhanced” Library

Gradle dependencies required

implementation(platform("software.amazon.awssdk:bom:2.15.22"))
implementation("software.amazon.awssdk:dynamodb-enhanced")

Enhanced Sync Mode

Dynamodb Enhanced is much more robust and easy to use than com.amazonaws.* classes. For Sync, you just do the below:

To configure DynamoDbEnhancedClient, refer to:

@Bean
fun dynamoDbSyncClient(): DynamoDbClient = DynamoDbClient.builder()
.region(Region.of(dynamoConfigProperties.region))
.endpointOverride(URI.create(dynamoConfigProperties.endpoint))
.credentialsProvider(DefaultCredentialsProvider.builder().build())
.build()
@Bean
fun dynamoDbEnhancedSyncClient(): DynamoDbEnhancedClient = DynamoDbEnhancedClient
.builder()
.dynamoDbClient(dynamoDbSyncClient())
.build()
Ref: https://github.com/bschandramohan/AWSConnect/blob/main/DynamodbConnect/src/main/kotlin/com/bschandramohan/learn/awsconnect/dynamodb/dynamodbconnect/config/DynamodClientSyncConfig.kt

And then you use that to get Table reference:

dynamoDbEnhancedSyncClient.table("Name_of_table", className::class.java)

And once you get the table, you can do

table.putItem(object)
table.getItem(key)

There is no need of mapper to convert the types as in previous versions.

Enhanced Async Mode with Spring Reactive Mono/Flux

One negative of this library with respect to Spring Reactive is that it returns the old mechanism in Java of CompletableFuture.

Spring reactive has Mono/Flux implementations and Kotlin has flow which are much more succinct to use now and so we will be converting the CompletableFuture to these types in the examples below

We need to configure the new EnhancedAsyncClient as show below:

@Bean
fun dynamoDbAsyncClient(): DynamoDbAsyncClient {
return DynamoDbAsyncClient.builder()
.region(Region.of(dynamoConfigProperties.region))
.endpointOverride(URI.create(dynamoConfigProperties.endpoint))
.credentialsProvider(DefaultCredentialsProvider.builder().build())
.build()
}

@Bean
fun dynamoDbEnhancedAsyncClient(): DynamoDbEnhancedAsyncClient {
return DynamoDbEnhancedAsyncClient.builder()
.dynamoDbClient(dynamoDbAsyncClient())
.build()
}
Ref: https://github.com/bschandramohan/AWSConnect/blob/main/DynamodbConnect/src/main/kotlin/com/bschandramohan/learn/awsconnect/dynamodb/dynamodbconnect/config/DynamoClientAsyncConfig.kt

You can get the table reference from Async client similar to sync client.

dynamoDbAsyncClient.table("Name_of_table", className::class.java)

And once you get the table, you can do

table.putItem(object)
table.getItem(key)

The only difference from Sync is that now the return types are CompletableFuture. Let’s convert that to Mono/Flux and return to handle it easily from Spring Reactive webapps

Mono.fromFuture(table.putItem(item))
.map { }
.doOnError { logger.error("Exception saving item - $it") }

For get above, and put below:

val key = Key.builder().partitionValue(userId).build()
val itemFuture = table.getItem(key)
return Mono.fromFuture(itemFuture)
.doOnError { logger.error("Exception fetching schedule for user=$userId - $it") }

Enhanced Async Mode with Spring Reactive Kotlin Flow

The only change from above for returning Kotlin Flow instead of CompletableFuture or Mono/Flux is wrapping of the completableFuture to Flow.

Unfortunately like Mono/Flux, Flow doesn’t have a Flow.fromFuture() to convert to flows. I use this:

val itemFuture = table.getItem(key)
return flow {
emit(itemFuture.get())
}

And getAll has much more interesting implementation:

val scanResults: PagePublisher<Schedule> = table.scan()
// Test code to print the items
// scanResults.items().subscribe { item -> logger.info("Inside repository; $item") }

return scanResults.items().asFlow()

Since the items returns a publisher, we can now convert it to flow.

Important step: For streaming endpoints, make sure you use MediaType.APPLICATION_NDJSON_VALUE as the media type. Else, the data is cached before rendering to the clients and misses the point of reactive results.

@GetMapping("/", produces = [MediaType.APPLICATION_NDJSON_VALUE])
suspend fun getAll(): ResponseEntity<Any> {
return try {
ResponseEntity.ok(service.getAll())
} catch (e: Exception) {
ApiServerError(entityName, "getAll", e)
}
}

Verification of Async Flows

Spring Reactive provides webclient to access the reactive apis. We can use that for tests/verification. Configure it as below to retrieve the results in Flow

inline fun <reified T : Any> getFlowResults(
uriParam: String,
mediaType: MediaType = MediaType.APPLICATION_NDJSON
): List<T?> = runBlocking {
val itemList = mutableListOf<T>()
client
.get()
.uri(uriParam)
.accept(mediaType)
.retrieve()
.onStatus(HttpStatus::isError, ::renderErrorResponse)
.bodyToFlow<T>()
.collect { item -> itemList.add(item) }

return@runBlocking itemList
}

Tests using above are here for reference:

https://github.com/bschandramohan/AWSConnect/blob/main/DynamodbConnect/src/test/kotlin/com/bschandramohan/learn/awsconnect/dynamodb/dynamodbconnect/integrationtests/ScheduleFlowTest.kt

Let me know if you are stuck at any point in the comments below.

--

--

bschandramohan
TechieConnect

Software Engineer working on Jvm-based Micro-services deployed on AWS cloud.