Интеграция Spring с Yandex Data Streams на примере сервиса аналитики

Vitaxa
6 min readMay 14, 2024

Если вы знакомы с AWS Kinesis, то Yandex Data Streams (YDS) для вас не будет чем-то новым. Для тех, кто не знает, YDS — это сервис для управления потоками данных. С его помощью вы можете, в режиме реального времени, обмениваться данными в вашей микросервисной среде, а также перегонять данные из одного источника в какой-либо потребитель.

Расскажу о том, как мы интегрировали Spring с YDS, в контексте той проблемы, которую решали, а именно создание шины данных для сбора информации с маркетплейсов, и о том, на какой баг наткнулись со стороны YDS при использовании AWS Kinesis Client.

В этой статье я не ставлю целью сравнить те или иные инструменты. Хочу лишь продемонстрировать, на реальном примере, в каком случае YDS может принести пользу, как с ним интегрироваться и на какие грабли не наступать.

На что способен Yandex Data Streams?

Пример процесса переноса данных в системы хранения

Для интеграции с YDS можно использовать протоколы:

  • Kafka API
  • AWS Kinesis Data Streams (его мы будем использовать)

YDS не поддерживает полностью ни один из представленных протоколов, но ключевые функции покрывает. Однако можно сказать спасибо команде Яндекса — реализация широко известных протоколов сильно упрощает интеграцию и миграцию кода (если вы по какой-то причине переходите с AWS Kinesis или Kafka). Кроме того, примеры использования и библиотеки будет проще найти под уже известные протоколы.

Как мы пришли к использованию Yandex Data Streams?

Схема на которой изображен процесс загрузки данных из маркетплейса в нашу OLAP базу данных
Схема загрузки данных в OLAP базу

У меня есть pet-project (MarketDB), где мы, нашей небольшой командой, работаем над сервисом который предоставляет аналитику продавцам на маркетплейсах. Чтобы работала аналитика, мы собираем данные по средством парсинга страниц того или иного маркетплейса. Раньше данные собирались и анализировались в одном монолитном приложении, но с началом миграции в микросервисную архитектуру мы стали собирать и сохранять данные в разных сервисах. Для этой цели мы решили найти относительно недорогое и эффективное решение, и таким образом мы обратили внимание на Yandex Data Streams.

Как использовать Yandex Data Steams в Spring?

Создание стрима в YDS

Первым делом нам надо создать поток данных в нашем Yandex Cloud. Не буду повторяться, ведь у Яндекса уже есть понятная и последовательная документация.

Кроме создания stream, необходимо создать Yandex Data Base (YDB). Создание YDB необходимо, потому что YDB — это ядро YDS, и именно он дает нам возможность реализовать message broker. Возможно, вам даже подойдет использовать лишь YDB, ведь в нем есть механизм топиков. Подробнее про топики YDB.

Чтение данных из YDS

Подключаем зависимость AWS Kinesis. Выше, я упомянул, что YDS поддерживает протокол AWS KDS.

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.15.0</version>
</dependency>

Дальше сконфигурируем Worker. Worker — это высокоуровневый класс который менеджит процесс загрузки данных из сегмента (шарда). Сегмент (shard) — это признак параллелизма который используется в YDS для горизонтального масштабирования, как партиции в Kafka.

@Bean
fun dataStreamWorker(): Worker {
val awsCredentials = BasicAWSCredentials(awsStreamProperties.accessKey, awsStreamProperties.secretKey)
val consumerConfig = KinesisClientLibConfiguration(
applicationName,
streamName,
kinesisEndpoint,
dinamoDbEndpoint,
InitialPositionInStream.LATEST,
WSStaticCredentialsProvider(awsCredentials),
AWSStaticCredentialsProvider(awsCredentials),
AWSStaticCredentialsProvider(awsCredentials),
awsStreamProperties.paymentStream.failOverTimeMillis,
"${awsStreamProperties.paymentStream.consumerName}-${UUID.randomUUID()}",
awsStreamProperties.paymentStream.maxRecords,
KinesisClientLibConfiguration.DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
KinesisClientLibConfiguration.DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
ClientConfiguration(),
ClientConfiguration(),
ClientConfiguration(),
KinesisClientLibConfiguration.DEFAULT_TASK_BACKOFF_TIME_MILLIS,
KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
awsStreamProperties.region,
KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
BillingMode.PAY_PER_REQUEST,
SimpleRecordsFetcherFactory(),
Duration.ofMinutes(1).toMillis(),
Duration.ofMinutes(5).toMillis(),
Duration.ofMinutes(30).toMillis(),
)
consumerConfig.withTimeoutInSeconds(awsStreamProperties.paymentStream.timeoutInSec)

return Worker.Builder()
.recordProcessorFactory(dataEventStreamProcessorFactory)
.config(consumerConfig)
.metricsFactory(NullMetricsFactory())
.build()
}

Настроек в классе KinesisClientLibConfiguration много, разберем ключевые из них.

Просмотр параметров потока данных

Сначала настроим подключение к потоку данных. Для этого откроем созданный stream и посмотрим его параметры. Если смотреть на приведенный пример кода, то параметры нам надо выставить следующие:
streamName — Путь к базе данных
kinesisEndpoint — Эндпоинт
dinamoDbEndpoint — Document API эндпоинт. Его можно найти если открыть параметры созданной базы YDB.

Document API эндпоинт = dinamoDbEndpoint

Помимо настроек подключения, необходимо предоставить данные для авторизации. Эту роль выполняет BasicAWSCredentials.

BasicAWSCredentials(awsStreamProperties.accessKey, awsStreamProperties.secretKey)

Где взять accessKey и secretKey? Для этого лучше всего обратиться к документации Yandex. В ней говорится, что необходимо создать идентификатор ключа сервисного аккаунта (accessKey) и секретный ключ сервисного аккаунта (secretKey).

BasicAWSCredentials передаем в качестве значения параметров: kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider.

С параметрами подключения закончили, теперь упомяну те настройки которые, на мой взгляд, важно проговорить.
initialPositionInStream — позиция с которой Worker будет читать поток данных.
workerId — уникальный идентификатор для каждого экземпляра вашего приложения. Он нужен с целью масштабирования и отказоустойчивости, когда вы читаете поток данных с разных инстансов вашего приложения.
billingModeвыставляем PAY_PER_REQUEST и никак иначе, только так будет работать с YDS.
metricsFactory — отключаем метрики в CloudWatch, лишний функционал в нашем случае
recordProcessorFactory — Обработчик для получения данных из shard’а

Пример реализации чтения потока данных

@Component
class EventStreamInitializer(
private val eventStreamRunner: EventStreamRunner,
) {
@PostConstruct
fun initialize() {
eventStreamRunner.runDataSteam()
}
}

@Component
class EventStreamRunner(
private val dataStreamWorker: Worker,
) {
@Async
fun runDataSteam() {
dataStreamWorker.run()
}

}

@Component
class DataEventStreamProcessorFactory : IRecordProcessorFactory {
override fun createProcessor(): IRecordProcessor {
return DataEventStreamListener()
}
}

@Component
class DataEventStreamListener : IRecordProcessor {

override fun initialize(initializationInput: InitializationInput) {
}

override fun processRecords(processRecordsInput: ProcessRecordsInput) {
val records = processRecordsInput.records
...some logic with the records
}

override fun shutdown(shutdownInput: ShutdownInput) {
}
}

В классе EventStreamInitializer инициализируем Worker чтобы слушать поток данных.
EventStreamRunner выполняет роль запуска отдельного потока для цикла, который будет переодически запрашивать данные из потока. DataEventStreamProcessorFactory отвечает за создание экземпляра обработчика (processor), который будет обрабатывать поступающие данные.
DataEventStreamListener выполняет некую заданную логику над поступающими данными.

Запись данных в YDS

@Bean
fun amazonKinesis(
@Value("\${aws-stream.endpoint}") endpoint: String,
@Value("\${aws-stream.accessKey}") accessKey: String,
@Value("\${aws-stream.secretKey}") secretKey: String,
@Value("\${aws-stream.region}") region: String
): AmazonKinesis {
val awsCredentials = BasicAWSCredentials(accessKey, secretKey)
return AmazonKinesisClientBuilder.standard()
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(endpoint, region))
.withCredentials(AWSStaticCredentialsProvider(awsCredentials))
.build()
}

Про accessKey и secretKey, а также BasicAWSCredential я упомянул выше, в consumer’е данных.

Напишем класс для отправки сообщений в stream.

@Component
class AwsStreamClient(private val amazonKinesis: AmazonKinesis) {

fun sendRecord(streamName: String, entries: List<PutRecordsRequestEntry>): PutRecordsResult {
val createRecordsRequest = PutRecordsRequest().apply {
streamName = streamName
records = entries
}
return amazonKinesis.putRecords(createRecordsRequest)
}
}

Класс AwsStreamClient содержит метод sendRecord, который в качестве аргументов принимает название потока (steamName) и список записей (entries), который мы отправляем в steam. При создании экземпляра класса PutRecordsRequestEntry, обратите внимание на переменную partitionKey, она вам понадобится для гарантий порядка сообщений и распределения нагрузки по shard’ам.

С какой проблемой столкнулись?

На сервисе, который читает данные из YDS, мы столкнулись с тем, что YDS, по какой-то причине, закрывал чтение шарда и переводил его в статус SHARD_END. При этом в логах мы видели ошибки:

Caught ResourceNotFoundException when fetching records for shard shard-000000

Reached end of shard shard-000000. Found childShards: []

Shard shard-000000: Mark for shutdown with reason TERMINATE

Invoking shutdown() for shard shard-000000, concurrencyToken: 68eca944-4edc-4d8a-8bb5-3b6867874ac5, original Shutdown reason: TERMINATE. childShards:null

Shard shard-000000: Shutting down consumer with SHARD_END reason without creating leases for child shards.

No child shards returned from service for shard shard-000000

Unable to cleanup lease for shard shard-000000 : No child shards found for this supposedly closed shard in both local DDB and in service shard-000000

Unable to clean up lease shard-000000 due to LeaseCleanupManager.LeaseCleanupResult(cleanedUpCompletedLease=false, cleanedUpGarbageLease=false, wereChildShardsPresent=false, wasResourceNotFound=false, cleanupFailureReason=No child shards found for this supposedly closed shard in both local DDB and in service shard-000000)

Чинилось только пересозданием Worker с новым workedId. На текущий момент такой проблемы не случается, мы инициировали общение с поддержкой Yandex Cloud и они, впоследствии, исправили данный баг.

Резюме

Для нас YDS стал решением проблемы недорогой и быстрой шины данных, соответствующей нашим требованиям к цене и производительности. Мы не пишем много данных — за сутки отправляем в YDS около 5 миллионов записей. В конечном итоге, при выборе подходящего инструмента ориентируйтесь на свои потребности, будь то YDS, Kafka или RabbitMQ. Кстати, для сценариев аналогичных RabbitMQ, можно рассмотреть Yandex Message Queue.

Интеграция c YDS получилась достаточно простой, если знать как настроить подключение, надеюсь тем кто будет использовать amazon-kinesis-client для интеграции — эта статья поможет быстрее разобраться.

--

--