Kafka Streams Internal State Management

nayoung
11 min readJan 11, 2024

--

흐르는 데이터에 filter 작업만 진행하는 것처럼 이벤트 상태를 유지하지 않아도 되는 것을 stateless processing이라고 한다. 이벤트는 어떤 일이 벌어진 사실이며 불변이다. 즉, stateless processing은 fact-driven이다.

사실들을 모으면 행동을 감지할 수 있는 것처럼 이벤트를 모아 상태를 관리하는 것을 stateful processing이라고 한다.

내부 상태가 계속 커지면 메모리 문제가 발생하므로 더 이상 사용하지 않는 레코드를 tombstone 레코드로 변경해 OOM 에러 발생을 방지했다. 내부 상태가 사용하는 메모리를 관리하기 위해 공부한 내용을 작성했으며, 내부 상태를 관리하지 않았을 때 어떤 일들이 발생하며 해결 방법에 대해 파악할 수 있다.

Internal state

KTable은 파티셔닝된 테이블의 추상화다. KTable을 통해 key에 대한 최신 상태에 접근할 수 있는데 이 최신 상태를 internal state(local state)라고 한다.

할당된 파티션을 추상화한 개념이므로 내부 상태는 특정 인스턴스만 접근할 수 있다. 여러 인스턴스로 동작하고 있다면 내부 상태는 전체 애플리케이션의 일부 상태만 표현한 것임을 의미한다.

카프카 스트림즈에서 데이터 상태를 저장하고 관리하는 저장소를 추상화한 것이 state store이다. state store는 다음과 같은 특징을 갖는다.

Embedded

카프카 스트림즈 애플리케이션 내의 task에 state store가 내장되어 있으며 기본 엔진으로 RocksDB를 사용한다. 외부 저장 엔진을 사용하지 않으므로 네트워크 호출을 요구하지 않아 네트워크 호출로 인한 지연과 처리 병목 현상이 발생하지 않는다.

또한 task에 state store가 내장되어 있으니 state에 접근할 때 발생할 수 있는 동시성 문제가 차단된다.

기본 엔진으로 RocksDB는 key-value 저장소로 key-value를 저장할 때 바이트 스트림 사용을 지원한다. 구글에서 개발한 LevelDB 코드를 fork해 적용한 많은 최적화 덕분에 읽기, 쓰기 작업이 빠르다.

Persistent VS In-Memory Stores

Stream processing that includes an external data source / https://www.confluent.io/resources/kafka-the-definitive-guide/

In-memory에서 상태를 관리하면 disk에 접근하는 것보다 더 빠른 속도로 상태를 파악할 수 있다. stream 처리 과정에서 external 데이터와의 integration에 대해 confluent kafka 가이드는 다음과 같이 설명한다.

The problem with this obvious idea is that an external lookup adds significant latency to the processing of every record — usually between 5–15 milliseconds. In many cases, this is not feasible. Often the additional load this places on the external datastore is also not acceptable — stream-processing systems can often handle 100K-500K events per second, but the database can only handle perhaps 10K events per second at reasonable performance.

https://www.confluent.io/resources/kafka-the-definitive-guide/

속도 측면에서 in-memory가 더 낫다고 state store를 무작정 in-memory에서 사용하면 안 된다. 높은 전송률, 운영 단순화, 실패 시 빠른 복구 등 여러 기준에 따라 상태 저장소를 어떻게 구성해야 하는지 고민해야 한다.

stateful applications의 resilient to failure 보장

카프카 스트림즈에서는 RocksDB를 이용해 추상화한 KTable을 통해 상태에 접근할 수 있다. 카프카 스트림즈는 stateful applications의 resilient to failure을 보장하기 위해 changelog 토픽과 standby replicas를 이용한다.

changelog 토픽으로 상태를 재구축하고 checkpoint 파일이 있는 경우 상태의 일부만 복구하므로 복구 시간을 더 단축할 수 있다. standby replicas를 이용하면 state store를 처음부터 초기화하는 과정을 생략할 수 있어 큰 상태를 가진 애플리케이션의 downtime을 줄일 수 있다.

In-memory 상태 저장소의 장애 복구

장애 복구 탄력성을 보장하지만, In-memory 상태 저장소의 장애 복구가 persistent state store보다 더 느릴 수 있다.

state store는 압축 또는 삭제할 값을 즉시 반영하지만, active segment는 압축 대상이 아니므로 changelog topic에 대한 압축은 즉시 반영되지 않는다. 그러므로 카프카 스트림즈에서 state store를 재구축할 때 active segment에 많은 non-compacted records를 가지고 있다면 In-memory 상태 저장소는 장애 복구에 많은 시간이 소요될 수 있다.

그래서 In-memory state store가 눈에 띌 만한 성능 향상이 있거나, 복구 시간을 줄이기 위해 standby replica를 사용해 빠른 복구를 해야 하는 경우에만 In-memory 저장소로 전환하는 것을 권장한다고 한다. (mastering kafka streams and ksqldb 에서)

Rebalancing

상태를 다시 초기화하는 가장 큰 원인은 rebalancing이다.

task는 카프카 스트림즈 애플리케이션에서 병렬로 수행할 수 있는 가장 작은 작업 단위로 stream thread는 1개 이상의 task를 처리할 수 있다.

카프카 스트림즈는 depth-first 전략을 사용해 이벤트를 수신하면 다른 이벤트가 처리되기 전에 topology의 각 stream processor로 routing 된다. 이로 인해 스트림 처리 연산이 느리면 대기 중인 이벤트 처리가 지연된다.

대기 중인 이벤트 처리가 지연되는 문제를 해결하고 싶다면 partition 개수를 늘려 task 수를 증가시키고 task와 스레드의 관계를 일대일로 만들어 병렬 처리하면 된다. 하지만 rebalancing으로 인한 비용이 아주 크다.

State Migration

group coordinator가 특정 인스턴스의 다운을 감지하면 group leader는 파티션을 재할당한다. 파티션을 재할당한다는 것은 internal state를 다른 인스턴스로 migration 하는 과정이 발생할 수 있음을 의미한다.

또한, standby replica를 가지고 있지 않은 인스턴스로 migration 하려면 모든 상태를 초기화해야 하므로 비용이 상당한데, 이를 해결하는 방법은 다음과 같다.

Static Membership

rolling restart(rolling bounce) 시 인스턴스가 restart 될 때마다 member ID가 초기화되면 rebalancing이 발생한다. 대량의 local state를 가진 애플리케이션이 dynamic membership을 사용하는 경우 rebalancing이 발생하면 내부 상태를 재할당해야 한다.

이를 해결하기 위해 인스턴스에게 고정 ID를 부여하는 static membership을 제공한다. 인스턴스가 다운되어도 session.timeout.ms 동안 rebalancing을 수행하지 않고 다운된 인스턴스가 활성화되길 기다린다.

static membership을 통해 rebalancing을 줄일 수 있지만, session.timeout.ms 동안 이벤트를 처리하지 못하며 group coordinator가 실제 장애를 늦게 감지할 수 있다는 단점이 있다.

Incremental Cooperative rebalancing

Eager Rebalancing의 문제는 rebalancing이 발생하면 모든 인스턴스가 할당된 자원을 포기해야 한다. Sticky Assignor를 사용해 최초에 할당된 파티션을 최대한 유지해도 Eager protocol 기반이므로 모든 파티션이 해제된다.

자원을 포기할 때 stop-the-world 현상이 발생하는데 모든 인스턴스의 데이터 처리가 중단된다. 이를 해결하기 위해 Incremental Cooperative rebalancing 리밸런싱 프로토콜을 사용한다.

  • global round of rebalancing을 여러 개의 작은 리밸런싱으로 교체
  • 소유자 변경이 필요 없으면 자원을 유지하고, migration 해야 할 task만 처리 중단

리밸런싱이 시작되어도 정상적인 애플리케이션 인스턴스들은 자원을 포기하지 않아 데이터 처리를 지속할 수 있다.

Controlling State Size

DB와 Kafka의 속도 차이를 해결하기 위해 in-memory DB에서 내부 상태를 관리하겠다고 했지만, 압축 대상이 아닌 active segment가 많은 non-compacted records를 가지고 있으면 in-memory DB의 복구 과정이 persistent store보다 오래 걸릴 수 있음을 파악했다.

많은 이벤트를 병렬 처리하기 위해 파티션 개수를 증가시키면 되지만, stateful processing에서 rebalancing이 발생하면 로컬 상태를 migration 하는 과정에서 큰 비용이 발생한다는 것도 파악했다.

결국 state store 크기가 무한으로 커지지 않게 잘 관리해야 한다. state store 사이즈 관리를 위해 불필요한 데이터를 삭제하면 내부 상태로 인한 비용을 줄일 수 있으며 방법은 다음과 같다.

Tombstones

delete marker라고도 하는 tombstone record는 key에 대한 value를 null로 설정해 state store에서 삭제되어야 할 상태임을 나타낸다.

@Payload(required = false) 설정 후 {key, null} 로 넣어야 NullPointerException이 발생하지 않는다.

Aggressive topic compaction

Kafka 토픽과 해당 파티션의 기본 데이터 구조는 write-ahead log 구조다. 이벤트가 active segment에 추가되는데 active segment는 압축 대상에서 제외된다.

changelog의 대부분 state persistence store는 active segment 파일에 residing 하므로 non-compacted change-log events, 즉 많은 redundant entries를 읽어야 한다. 다음 설정을 잘 조절하면 적은 레코드만 replay 할 수 있다.

  • segment.bytes 설정으로 세그먼트 파일 크기 제어
  • segment.ms 설정으로 세그먼트 파일이 가득 차지 않아도 압축 또는 삭제될 수 있도록 로그를 강제 정리하는 시간 간격 조절
  • min.cleanable.dirty.ms 설정으로 compactor의 로그 정리 비율 제어 (비율이 낮을수록 압축이 자주 일어나지만, 그만큼 브로커 부담)

Window Retention

return stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(5)))
.reduce(Long::sum,
Materialized
.<String, Long, WindowStore<Bytes, byte[]>>as("total-quantity")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withRetention(Duration.ofMinutes(1)))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().shutDownWhenFull()))
.toStream()

Materialized withRetention 메서드를 사용해 카프카 스트림즈가 windowed store에 레코드를 얼마나 유지할지 지정한다. window retention period를 줄이면 windowed state store 사이즈가 작아져 recovery time을 줄일 수 있다.

--

--