스트림 프로세싱의 긴 여정을 위한 이정표 (w. Apache Flink)
이 글은 스트림 프로세싱이란 개념이 생소하고 Flink를 현업에서 도입하고자 하는 분들이 어디서 부터 시작해야 하는지, 앞으로 무엇을 공부해야하는 지 알 수 있는 이정표 역할을 할 수 있길 기대하면서 작성했습니다.
저도 당분간 업무 영역이 이쪽으로 바꾸어서, 급하게 공부한거라 제가 맞는 말을 하고 있는건지 확신이 없습니다, 잘 아시는 분이 계시다면 공격적인 코멘트 부탁드립니다.
목차
- 스트림 프로세싱 (Stream Processing)
- 윈도우 (Window)
- 워터마크 (Watermark)
- 정확히 한 번 처리 (Excatly Once Semantic)
- 한계 (Limitation)
- Flink 설치 및 플레이
스트림 프로세싱
2000년대 초반부터 많은 곳에서 도입한 대규모 데이터를 다루는 방법은, MapReduce 기반의 대규모 배치 시스템입니다. 대표적으로는 Hadoop 생태계의 제품군들이 있습니다. 배치 시스템은 기본적으로 대규모 데이터에 복잡한 연산을 수행하기 때문에 지연시간이 상대적으로 크구요. 스트림 처리 패러다임에서는 단순한 연산을 짧은 지연시간 내에 처리하는 것을 목표로 합니다.
스트림과 배치를 구분하는 많은 특징들이 있지만, 학자들이 대표적으로 구분하는 방식은 데이터의 범위가 고정되어 있는지 (bounded), 고정되어 있지 않은지 (unbounded) 를 기준으로 삼고 있습니다. 배치 시스템은 대규모의 데이터를 고정된 시간 범위 안에 있는 ‘유한’ 데이터에 사용하는 시스템으로 보고, 스트림 프로세싱은 시간 범위가 고정되어 있지 않은 ‘무한’ 대의 데이터를 처리하는 것을 목표로 합니다.
요즘, 어떤 사람들은 스트리밍 시스템과 배치 시스템을 통합하고자 하는 시도를 하는데요. 아직 까지는 여러 한계점이 존재합니다. 이는 [#5. 한계]
에서 좀 더 다루겠습니다.
이런, 스트림 프로세싱을 어디에 쓸 수 있을까요? 먼저 스트림 프로세싱이 관심을 가지는 데이터는 어떤 모양일까 상상해봅시다.
전체 회원 목록 같은 데이터는 스트림 프로세싱에서 관심을 가지는 데이터가 아닙니다. 이런 데이터는 특정 시간에 데이터베이스에서 질의 했을 때 반환되는 전체 목록이지 스트림으로 표현할 수 없습니다. 스트림이란 “새로운 회원이 생성됨”과 같은 이벤트 입니다.
스트림 데이터의 예시
- 쇼핑몰 고객의 구매 요청
- 항공사 예약 발생
- 보험금 청구
- 은행 트랜잭션 발생
- 클릭 이벤트
- 서버 로그
- 현재 IoT 장비의 위치
- 기타 등등
스트림 데이터를 활용하는 대표적인 예시는 다음이 있습니다.
- 은행에서 이상 거래를 탐지
- 쇼핑몰에서 구매자의 수요에 맞춰 동적 가격 계산
- 온라인 사용자의 행동 패턴 분석
- 실시간 추천 시스템
- 기타 등등
요즘에 마이크로소프트 아키텍처(MSA) 때문에 이벤트 소싱과 CQRS에 대한 이야기를 많이하죠. 이런 패턴들도 이벤트 데이터를 입력으로 받기 때문에, Flink와 같은 스트리밍 시스템을 단순히 분석용으로 쓰지 않고 비즈니스에 접목시키려는 움직임도 있습니다[7][10].
위에서 회원 목록을 가진 데이터베이스는 스트림이 아니다라고 했는데, 관계형 데이터베이스의 변경 내역을 이벤트로 수신해서 스트림 프로세싱을 수행할 수 있는 카프카 커넥트(Kafka Connect)같은 오픈소스들도 있어서 사용할 수 있는 영역이 꽤 넓습니다.
좋은 스트림 시스템의 조건
좋은 스트림 시스템은 어떤 걸까요? 이 질문에 대해서 답을 할 수 있다면 아마 스트림 시스템이 무엇을 해줄 수 있고 무엇을 못하는지도 이해할 수 있을 겁니다.
① 입력 데이터가 어느 빈도로 발생하는지 예측할 수 없습니다. 즉, 발생하는 부하를 정확히 예측하기 힘듭니다
- 부하에 유동적으로 대응할 수 있어야 하며, 데이터 유입이 급증해도 성능이 저하되지 않아야 합니다
- 급증한 데이터에 대해 스트림 프로세싱을 수행하지 못한다면, 데이터는 계속 지연되기 때문에 실시간이라고 부르기 힘들겠죠
② 데이터는 잘못된 순서로 들어오거나, 데이터가 발생한 시간과 시스템이 데이터를 인식한 시간이 다를 수 있습니다. 전자를 이벤트 시간(Event Time) 후자를 인입 시간(Processing Time) 이라고 부릅니다
- 데이터가 잘못된 순서로 들어오거나 지연되어서 들어온 경우에도 스트림 프로세싱을 수행할 수 있어야 하거나, 적어도 얼마나 지연되면 데이터를 버릴 것인지 기준은 제시할 수 있어야 합니다. 이를 스트림 프로세싱 생태계에서는 이런 기준을 워터마크라고 부릅니다
- 워터마크에 대한 내용은 따로 세션을 따서 설명합니다.
③ 시스템은 내결함성을 가져야 합니다.
- 현대 시스템이라면 당연히 갖춰야 할 덕목입니다
④ 정확히 한 번 수행
- 스트림 시스템을 보다 보면 정확히 한 번 처리(Exactly Once)를 보장한다는 이야기가 자주 나옵니다. 이는 스트림 데이터 특징 때문에 그런데요.
- 스트림 데이터를 처리할 때, 외부 리소스에 의존하는 코드가 있다면 (현재 외부 온도, 주식 가격 등) 데이터를 처리 할 때마다 값이 변하게 됩니다. 이를 비결정적(non-determinism) 이라고 부릅니다.
- 이 이야기는
[정확히 한 번 처리]
에서 더 자세하게 다뤄보겠습니다.
데이터 플로우 (Data Flow)
데이터 플로우는 구글 클라우드 플랫폼(GCP)에서 스트림 분석 시스템을 담당하는 제품입니다. 이 제품명은 구글에서 2015년 발표한 The Dataflow Model[3] 이라는 논문에서 유래가 되었는데 현대 스트림 프로세싱의 핵심 이론들이 잘 설명되어 있습니다.
개발자 커뮤니티에서는, 웹 개발은 개념만 알고 있으면 언어나 프레임워크가 달라져도 진입장벽이 낮다는 말이 있는데요. 스트림 프로세싱의 세계도 마찬가지입니다. Apache Flink, Beam 등 다양한 제품군들은 사실 공통된 개념을 차용해서 만들어졌기 때문에 코딩 스타일, 용어, 시스템의 가정사항 등이 유사합니다.
그래서 본 글에서 설명하는 모든 개념들은 DataFlow Model에서 말하는 용어와 동일하구요. 지금부터는 현대 스트림 프로세싱에서 통용되는 일반적인 개념들을 한데 묶어서 DataFlow Model 이라고 칭하겠습니다. 실제 역사적으로는 각 용어의 등장 시기가 조금 상이합니다.
윈도우 라이프사이클
DataFlow Model의 여러 개념중에서 첫 번째로 윈도우에 대해서 설명하려고 하는데요. 사실 스트리밍 시스템과 배치 시스템은 큰 차이가 없습니다. 배치 시스템은 유한(bounded) 데이터를, 스트리밍 시스템은 무한(unbounded) 데이터를 다룬다고 했습니다.
어떻게 무한대의 데이터를 처리할까요? 스트리밍 시스템은 무한 데이터 속에서 유한개의 데이터를 선정해서 하나의 시점을 만드는데요. 그것이 윈도우 입니다. 윈도우를 사용한다는 건 무한 데이터를 유한개의 데이터로 바꿔서 처리한다는 아이디어로 출발합니다.
Apache Flink에서 윈도우는 위 그림과 같은 라이프 사이클을 가집니다. 하나의 윈도우는 입력 스트림(Input Stream)을 소스로 받고 출력 스트림(Output Stream)을 생성하는 큰 구조로 이루어져 있고, 이렇게 생성된 출력 스트림은 또 다른 곳에서 입력 스트림이 됩니다.
윈도우 할당자 (Window Assigner)
들어온 데이터를 하나 이상의 윈도우에 할당하는 역할을 합니다. Flink에는 기본적으로 텀블링 윈도우, 슬라이딩 윈도우, 세션 윈도우 글로벌 윈도우 4개의 기본 윈도우 할당자를 제공합니다.
윈도우 할당자는 입력 데이터를 보고 현재 조건에 만족하는 윈도우가 없으면 새로운 윈도우를 생성하고 데이터를 삽입합니다. 하나의 윈도우는 데이터가 모두 담겨있다는 확신이 들면 윈도우 함수를 먹여서 데이터에 연산을 수행하여 데이터를 변환하고 윈도우를 삭제합니다.
트리거 (Trigger)
윈도우는 하나의 트리거
와 윈도우 함수(Window Function)
를 가지고 있습니다. 함수에는 윈도우에 들어온 데이터에 대해 어떤 연산을 적용할 것인지 정의하고, 트리거는 언제 윈도우에 있는 데이터로 연산을 수행할 지 결정하는 역할을 합니다. 예를 들면, 윈도우 안에 있는 데이터가 4개 이상이라면 혹은 워터마크가 윈도우 종점을 통과했다면 이라는 조건을 달 수 있습니다.
소멸자 (Evictor)
트리거가 발생해서 윈도우를 구체화 하기 전에, 데이터 일부를 연산에서 제외시키도록 결정하는 역할을 하고 있습니다. 일종의 필터라고 볼 수 있겠네요.
Keyed or Non-Keyed Window
Flink에서는 keyBy
함수로 데이터의 특정 값을 키로 잡을 수 있습니다. 만약 키를 잡지 않으면 하나의 세션에서 처리하구요. 키를 잡으면 병렬로 처리됩니다. 카프카에서 PartitionKey를 생각하시면 되겠습니다.
4가지 기본 윈도우
지금부터 대표적인 윈도우 4가지를 소개하겠습니다. 이번 장에 첨부된 내용과 그림은 Flink 공식문서[4]만을 인용했습니다.
텀블링 윈도우 (Tumbling Window)
고정 크기 윈도우라고도 부르며 일정한 크기로 윈도우를 할당합니다. 데이터의 개수가 될수도 있고 시간이 될수도 있습니다.
input = ... # type: DataStream
# tumbling event-time windows
input \
.key_by(<key selector>) \
.window(TumblingEventTimeWindows.of(Time.seconds(5))) \
.<windowed transformation>(<window function>)
# tumbling processing-time windows
input \
.key_by(<key selector>) \
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) \
.<windowed transformation>(<window function>)
# daily tumbling event-time windows offset by -8 hours.
input \
.key_by(<key selector>) \
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \
.<windowed transformation>(<window function>)
윈도우의 기준을 이벤트 시간으로 나눌지 프로세싱 시간으로 나눌지도 결정할 수 있고 시간도 밀리초 단위까지 세분화 해서 윈도우를 자를 수 있습니다. 위 코드에서 마지막 예시는 Offset을 TumblingEventTimeWindows.of
함수의 두번째 인자로 주었는데요. 시간을 나누는 기준을 의미합니다. 예를 들어 Time.days(1)
을 사용하면 기본값은 00시 부터 23:59:59 인데, 이를 아침 8시부터 다음날 아침 8시로 지정하고 싶다면? 두 번째 파라미터로 +8 시간을 넘겨주면 됩니다. (위 함수의 두번 째 파라미터의 이름은 Offset 입니다)
슬라이딩 윈도우(Sliding Window)
슬라이딩 윈도우 또한 고정 크기의 윈도우를 가지는데, window slide parameter
를 지정해서 윈도우가 생성되는 빈도를 결정합니다. 즉, 크기와 생성 빈도수를 결정하기 때문에 빈도수를 크기보다 작은 값으로 주면 데이터가 여러 윈도우에 겹치게 되겠죠?
input = ... # type: DataStream
# sliding event-time windows
input \
.key_by(<key selector>) \
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) \
.<windowed transformation>(<window function>)
# sliding processing-time windows
input \
.key_by(<key selector>) \
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) \
.<windowed transformation>(<window function>)
# sliding processing-time windows offset by -8 hours
input \
.key_by(<key selector>) \
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) \
.<windowed transformation>(<window function>)
윈도우를 생성하는 방법 자체는 비슷하고, window
함수로 윈도우 할당자의 성격을 지정하는게 인상적이죠. 변수 input
은 데이터 스트림 객체라고만 알고 있으시면 됩니다.
세션 윈도우 (Session Window)
세션 윈도우는 특이하게 특정 시간 동안 데이터가 나타나지 않으면 구체화 하는 윈도우입니다. 예를 들면, 유저 단위로 프로세싱하려고 할 때 사용할 수 있습니다. (e.g. 특정 유저가 10분간 활동이 없으면, 윈도우를 구체화 한다)
글로벌 윈도우(Global Window)
글로벌 윈도우는 같은 키를 가진 모든 데이터를 하나의 전역 윈도우에 담습니다. 글로벌 윈도우는 반드시 커스텀 트리거를 정의해서 사용해야 하구요, 그러지 않으면 아무런 역할도 하지 않습니다. 왜냐면 윈도우에 끝 지점이라는게 존재하지 않아서 기본 트리거는 아무런 Firing 도 안하기 때문입니다.
Allowed Lateness와 Side Output
스트리밍 시스템에서는 크게 두가지 시간 도메인이 중요합니다. 이벤트가 실제로 발생된 시점인 이벤트 시간과 시스템이 데이터를 인지한 인입 시간이죠. 이벤트 시간과 인입 시간이 완전히 동일하면 참 좋겠지만, 실제로는 괴리가 있습니다. 스트리밍 시스템이란 책[5] 에서는 다소 극단 적인 예시를 듭니다.
비행기 안에서 스마트폰 앱에서 터치를 하고 곧바로 비행기 모드로 돌린 다음, 2시간 뒤 비행기에서 내리고 비행기 모드를 풀면 서버는 2시간 이전의 데이터를 받게 된다.
책에서 든 예시는 극단적이지만 서버-to-서버 통신에서도 크고 작은 지연은 항상 발생됩니다. 여기서 중요한 건 스트리밍 시스템의 트리거는 어쨋든 윈도우를 언제 닫을지를 결정해야 한다는 겁니다. 언제 들어올지 모르는 데이터를 위해 윈도우를 구체화 하지 않으면 성능이 저하됩니다.
Flink에서는 Allowed Lateness 라고 불리는 수치를 결정해서 데이터가 늦으면 언제까지 늦는지를 지정합니다. 기본값은 0이구요. 기본값을 이용하면 윈도우를 지나서 도착한 데이터는 누락(Drop)됩니다.
input = ... # type: DataStream
input \
.key_by(<key selector>) \
.window(<window assigner>) \
.allowed_lateness(<time>) \
.<windowed transformation>(<window function>)
실제로는 데이터가 누락되는게 용인 될 수 없는 요구사항이 많을 겁니다. 그 때는 SideOutput으로 선언해서 늦게 들어온 데이터를 별도의 스트림으로 다룰 수 있습니다. 이를테면 해당 스트림을 이용해서 카프카의 dead letter 토픽으로 전송해서 누락된 데이터를 수신받을 수 있습니다
late_output_tag = OutputTag("late-data", type_info)
input = ... # type: DataStream
result = input \
.key_by(<key selector>) \
.window(<window assigner>) \
.allowed_lateness(<time>) \
.side_output_late_data(late_output_tag) \
.<windowed transformation>(<window function>)
late_stream = result.get_side_output(late_output_tag)p
워터마크 (Watermark)
위에서 이벤트 시간과 인입 시간에 대해 지속적으로 언급했었는데요. 워터마크에 대해 이야기 할 때 중요한 용어이기 때문에 다시 한 번 정리 할 필요가 있습니다. Flink 공식 문서에 아주 좋은 그림이 있어서 인용해봅니다[6].
이벤트 시간 (Event Time)
은 데이터를 발생시킨 주체의 시간을 말하고, 인입 시간 (Processing Time)
은 윈도우가 데이터를 받은 시간을 말합니다. 실시간 스트리밍에서 이런 인식이 중요한 이유는 이 두 시간 사이에는 반드시 왜곡이 발생하기 때문입니다.
윈도우라는 개념 자체가 ‘무한’ 데이터를 ‘유한’개의 집합으로 나눠서 다루는 개념이라고 했었죠. 10분 단위의 텀블링 윈도우를 사용한다면, 10:00 ~ 10:10
사이에 존재하는 모든 데이터가 도착하고 윈도우 함수를 먹여야 제대로 된 값이 계산될 겁니다. 실제 세상에서는 10:09 의 데이터가 10:11 에 도착할 수 있기 때문에 인입 시간을 기준으로 윈도우를 구체화 하는 건 데이터가 누락 될 위험이 굉장히 커집니다.
여기서 워터마크가 사용됩니다. 워터마크는 F(P) -> E
의 형태로 인입 시간을 받으면 이벤트 시간을 리턴하는 함수로 표현할 수 있습니다. 반환된 E는 지금 이 순간 부터 E 이전에 존재하는 모든 데이터를 받았다는 의미로, 다르게 말하면 앞으로는 E 이전의 시간을 가진 데이터가 입력으로 주어지는 일이 없다는 말이 됩니다.
인입 시간과 이벤트 시간의 왜곡이 존재할 때 아주 완벽한 워터마크 함수는 위 그래프에서 굵은 실선을 그리는 함수가 될 겁니다.
💡 음.. 근데 뭔가 이상합니다. 이러한 워터마크를 정확히 계산할 수 있나요? 스트리밍 시스템은 입력 소스에 대한 어떤 정보도 없기 때문에 완벽한 워터마크를 만드는 건 애초에 불가능합니다. 그래서 워터마크 함수는 휴리스틱할 수 밖에 없습니다.
완벽한 워터마크를 만드는 게 불가능 한 건 아닙니다. 제가 참고한 책 스트리밍 시스템[5] 에서는 완벽한 워터마크를 만들기 위한 두 가지 조건을 아래처럼 설명했습니다.
- Apache Kafka 처럼 부분 순서를 보장하는 시스템이 존재하고 이벤트를 완벽히 시간 순서대로 순차적으로 쌓은 경우
- 그냥 프로세싱 시간을 기준으로 윈도우를 구체화 한다.
[조건 1]을 보면 명확하죠, 애초에 순서대로 데이터가 도착한다는 보장이 있으면 완벽한 워터마크를 쉽게 만들 수 있습니다. 10:00~10:10 의 텀블링 윈도우를 가지고 있었는데 이벤트 시간이 10:11인 데이터를 받았다면 윈도우를 구체화 해도 된다는 명확한 신호입니다. [조건 2]는 그냥 이벤트 시간을 무시하는 것이기 때문에 완벽한 워터마크를 만들 수 있습니다.
즉, 위 두가지 경우가 아니라면 워터마크는 항상 휴리스틱 할 수 밖에 없고 완벽한 계산을 요구하는 비즈니스라면 늦게 들어온 데이터(lateness)를 어떻게 처리해야 하는가를 고민해야 합니다. 실제로 Flink를 이용해서 비즈니스 로직을 구현하신 다면 관련 유관부서와 이런 이야기를 하게 될겁니다.
데이터가 발생한 시점부터 최대 30분 이내로는 카프카로 메시지가 전달받을 것을 가정했다. 그 이상의 괴리가 발생하면 데이터가 누락될 수 있습니다.
이것이 워터마크에 대한 기본적인 개념이었다면, 실제로 시스템을 구축할 때, 워터마크가 어떻게 생성되고 스트림이 병렬로 생성되었을 때, 각 스트림으로 전파되는지 알 필요가 있습니다. 이 내용은 이 논문[8]에 잘 나와있습니다. 구글 직원들이 작성한 논문이구요. 자사 제품인 Google Cloud DataFlow와 Flink를 비교하는 내용을 담고 있습니다.
정확히 한 번 처리 (Exactly Once Semantic)
스트리밍 시스템들을 보면 정확히 한 번 처리(Exactly Once)하는 것을 아주 중요하게 강조하고 있습니다.
데이터 처리에 있어서 누락이 없어야 하는 건 굉장히 중요하죠. 윈도우에서 연산을 먹이는 게 누락되면 안되고요. 정당한 시점에 데이터를 받았는데 윈도우 할당자가 적절한 윈도우에 포함시키지 않고 누락시켰다면 그것도 문제입니다. 정확히 한 번 처리에 있어서 두 가지 관점을 볼 수 있습니다. ① 윈도우가 정확히 한 번 구체화 되었는가?, ② 데이터가 중복으로 들어가진 않았는가?
만약, 스트리밍 시스템이 이걸 제대로 하지 못한다면 우리가 지금까지 공부한 모든 것들이 쓰레기가 됩니다. 그냥 배치 시스템의 결과를 기다리면 되는거죠. 스트리밍 시스템을 쓸 이유가 없습니다. 다행히도 구글 데이터 플로우는 이에 대한 방법을 제공하고 있습니다.
① 윈도우가 정확히 한 번 구체화 되었는가?
애초에 윈도우가 정확히 한 번만 구체화 되야 하는게 왜 중요할까요? 왜냐면 윈도우 함수 자체는 멱등(idempotent)하지 않을 수 있기 때문입니다. 예를 들어, 은행에서 이상 거래 감지 프로세스를 돌리고 있는데 윈도우 함수에 외부 정보를 인용하면 어떨까요? 예를 들면, 해당 고객의 현재 위치나 거래 기록은 API를 호출하는 시점에 따라 달라지는 값입니다.
윈도우가 2번 이상 구체화 되면 윈도우가 생성한 출력 스트림의 데이터가 완전히 달라질 수 있습니다. 이게 어떤 문제를 일으킬지 모르는 거죠. Google Cloud DataFlow 에서는 이런 비결정적인 처리를 체크포인트(Checkpoint)를 사용해서 결정적인 처리로 치환해서 문제를 해결합니다. 각 변환에서 나오는 출력 스트림을 고유 ID를 붙여서 체크포인트에 저장한 다음에 이후에 입력 스트림은 이 체크포인트를 재사용 해서 정확히 한 번 처리를 보장합니다.
그래서 Apache Flink에서는 이 체크포인트 정보를 관리하는게 굉장히 중요합니다. 그래서 Amazon S3에 체크포인트를 저장하고 관리하는게 마음이 아주 편안해지겠죠. 아마존 제품군을 쓰신다면 Amazon Kinesis Data Analytics (Amazon KDA)도 좋은 선택입니다. 이런 고민을 할 필요 없이 행복하게 코딩만 하면 됩니다.
② 데이터가 중복으로 들어가진 않았는가?
이는 간단하게 데이터에 고유 ID를 부여해서 윈도우에 중복된 데이터가 들어가 있는지 아닌지만 검증해서 해결할 수 있습니다.
스트리밍 시스템[5] 에선, 위 두가지 사항을 구현하기 위해 어떻게 성능을 최적화를 했는지에 대한 깊은 이야기도 공유하고 있습니다. 교환법칙이 성립하는 연산에 대해서는 스트림을 나누지 않고 하나로 합쳐서 체크포인트가 생성되는 지점을 줄이기도 하구요. 블룸 필터를 사용해서 데이터의 중복 체크를 효율적으로 하기도 합니다. 더 많은 내용이 궁금하시다면 서적 [5]를 추천합니다.
한계 (Limitation)
스트리밍 시스템 [5] 책의 서문에서는 잘 만들어진 스트리밍 시스템이 있다면, 더 이상 배치 시스템은 필요하지 않다라는 꽤 대범한 멘트로 독자를 유혹하고 있습니다. 과연 그럴까요? 이 세션에서는 제가 현업에 스트리밍 시스템을 적용하면서 아직까지 해결하지 못한 고민들을 한계점이라고 적었습니다. 거대한 지구 어디선가 누군가는 이 문제를 해결했을 수도 있고, 문제라고 인식하지 않을 수도 있습니다.
- 재처리 (Re-processing)
윈도우 함수의 연산이 변경되었다면 어떻게 할까요? 이미 지나간 입력 스트림에 대해서 처음부터 연산을 수행할 수 있을까요? 일단, 너무 느리겠죠. Map/Reduce기반의 배치 시스템에서는 단 번에 대규모 클러스터를 운용해 수초내에 처리할 수 있는게 스트림 프로세싱에서는 조금 버거울 수 있습니다. 또한, 모든 데이터가 카프카에 저장되어 있다면 아주 좋겠지만.. 카프카에서 제거된 데이터가 있으면 아주 까다로워 집니다. 시스템을 구축하고 시간이 몇 개월 지나서 스키마에 새로운 필드를 추가한다거나 집계 하려는 필드를 변경하려는 요구사항이 들어오면 어떨까요? 상상만 해도 벌써 퇴근하고 싶어집니다. - 누락 데이터
추천 시스템, 이상 탐지 등의 요구사항에서는 일부 데이터가 누락되어도 경향성을 계산하는데는 큰 문제가 없을 수 있습니다. 표본 집단에 대해서 연산을 실행한거라고 보면 되니까요. 반면, 집계 처리와 같이 데이터 누락이 발생되면 곤란한 상황도 있을 겁니다. 스트리밍 시스템이 워터마크를 이용해서 어느정도 휴리스틱하게 윈도우를 확정짓긴 하지만 여전히 데이터가 누락될 수 있다는 가정이 존재하기 때문에 이를 슬기롭게 대처할 수 있는 방안도 필요합니다. - 메모리 문제
스트림 프로세싱이 무한 데이터를 다루는 컨셉인데, 메모리는 무한하지 않기 때문에 생기는 문제들이 있습니다. 윈도우를 무작정 크게 1시간 단위로 잡는다면 메모리나 디스크에 1시간 단위의 데이터를 모두 가지고 있어야 합니다. 사실 1시간 단위의 윈도우를 쓴다면 그냥 배치 시스템을 쓰는게 더 좋은 대안일 수 있습니다. - Blocking Operation
스트리밍 코드 어디선가 외부 리소스에서 정보를 받아오는 HTTP 호출을 하면, 출력 스트림이 나오기 까지 지연이 계속 추가될 수 있습니다. 먼저, HTTP 호출을 예시로 들었지만 2개 이상의 스트림을 조인해서 연산을 하는 경우에도 처리율이 낮으면 지연시간이 또 느려집니다.
다른 분야에서는 여러 고수들과 업계 선배분들이 이미 매를 먼저 맞아놔서, 후발대로 개발을 시작한 우리가 편하게 일할 수 있는 환경이 갖추어져 있는데요. 대표적으로 도커와 쿠버네티스가 있겠죠.. 저는 이제 쿠버네티스 없는 환경에서는 일 못할 것 같습니다.
그런데 스트리밍 시스템은 아직 도입한 사례도 찾기 힘들고, 업계의 경험이 미성숙한 상태라서 매를 맞는 대상이 내가 될 수 있습니다. 스트리밍 시스템 도입을 위해 이 글까지 찾아오신 분들이 계시면 같이 화이팅 하자는 말씀 드립니다.
Flink 설치 및 플레이
마지막으로, 제가 스트리밍 프로세싱을 직접 코딩하기 위해 만들었던 인프라 환경을 공유하고 마치겠습니다. Apache Flink 환경을 구축해보고 파이썬으로 테스트 해볼 수 있습니다. 소스 코드는 이 링크에 있습니다.
환경 구축은 쿠버네티스를 사용했고 쿠버네티스에서 다양한 서비스를 설치해서 다양한 환경의 인프라를 세팅해보고 코딩해보실 수 있습니다. 아래는 기본으로 설치하는 서비스들입니다.
- Apache Flink
Apache Flink Kubernetes Operator를 이용해서 설치합니다. Flink는 태스크를 관리하는 JobManager와 실제 태스크를 수행하는 TaskManager 두 개의 인스턴스로 구성되는데요. 이 오퍼레이터를 이용하면 새로운 태스크가 등록될 때 동적으로 TaskManager를 생성하고 할당해줍니다. - Apache Kafka
대부분의 스트리밍 시스템들은 입력 데이터 소스로 아파치 카프카를 이용합니다. 데이터의 영속성을 보장하면서 메시지 큐 역할을 합니다. - Apache Kafka UI
카프카의 메시지를 웹 인터페이스로 확인하기 위해 사용합니다. - MinIO
Amazon S3의 API와 완벽히 호환되는 쿠버네티스 네이티브 오브젝트 스토리지입니다. 로컬에서 MinIO로 테스트 해보고 이를 S3 레벨까지 올려서 프로덕션 할 수 있습니다.
이 외에도 필요에 따라 MongoDB, MySQL등을 추가로 설치해서 사용하시면 됩니다. 코드에서는 Helm Chart를 이용해서 서비스들을 관리하는데요. Helm Chart를 이용하면 복잡한 인프라도 선언적으로 쓸 수 있고 관리가 편합니다.
로컬 쿠버네티스 설치
쿠버네티스를 로컬에 설치하기 위해 k3d를 이용했습니다.
$ k3d cluster create local-cluster \
--api-port 6550 \
-p "30000-30010:30000-30010@server:0" \
--agents 2
위 명령어를 도커가 실행중인 상태에서 실행하셔야 됩니다. 도커 위에 가상의 쿠버네티스 환경을 설치하는 명령어 입니다. -p 30000–30010:30000–30010
옵션으로 쿠버네티스의 NodePort와 호스트의 포트를 바인딩 했습니다. 이걸 설정함으로써 쿠버네티스의 올라간 서비스를 로컬에서 접근할 수 있습니다.
Flink Cluster 생성
Flink Kubernetes Operator로 Flink를 설치합니다.
# Insall Flink Kubernetes Operator
helm repo add flink-operator-repo \
https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
helm install -f flink/values.yml \
flink-kubernetes-operator \
flink-operator-repo/flink-kubernetes-operator
# Deploy Flink Server
kubectl apply -f flink/deployment.yml
helm 으로 시작하는 두 명령어는 오퍼레이터를 설치하는 명령어이구요. 실제 Flink Seerver는 kubectl 명령어로 설치했습니다.
컴포넌트 설치
Kafka, MinIO 등의 필요 컴포넌트를 설치합니다. 이는 헬름 차트를 이용했습니다. 차트 이름이 반드시 play 여야 한다는 점에 유의해주세요.
$ helm install play ./helm-charts
모두 설치하고 나서 kubectl get pods
명령어로 확인해보면 아래 처럼 서비스가 설치된 모습을 볼 수 있습니다.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-545c768689-m8kpv 1/1 Running 0 11m
play-minio-78fd77d9cd-zw7qh 0/1 Pending 0 2m21s
play-minio-post-job-crwz7 1/1 Running 0 2m21s
play-kafka-faker-68bc96b949-9xrth 1/1 Running 0 2m21s
play-kafka-0 1/1 Running 0 2m21s
play-kafka-ui-6d464f9fd-4mg5w 1/1 Running 0 2m21s
여기 까지 설치하고 나면 localhsot:30000
으로 접속해서 Flink Dashboard가 나오는 걸 보실 수 있습니다.
마지막으로 Job을 등록하려면 flink 명령어를 설치해야되는데요. 하나의 바이너리로 만들어진게 없어서 Flink에서 압축 파일을 직접 다운로드 받아서 해제하고 .bashrc 나 .bash_profile 에 명령어를 PATH 환경변수로 등록해야 합니다. 자세한 설치 가이드는 이곳에 있습니다.
버전에 민감한 친구라서 아래 버전을 반드시 맞춰주셔야 합니다.
- Python 3.9
- Java 11
- Flink 1.17.0
이 글을 작성하는 시점에 1.17.0이 최신버전이라서 이렇게 맞추었지만, 소스 코드에서 버전을 적절히 변경하시면 이전 버전에서도 동작합니다. Apache Flink Kubernetes Operator에서 지원하기만 하면 잘 동작했습니다.
$ flink --version
Version: 1.17.0
여기 까지 인내심을 가지고 잘 따라오셨으면 축하드립니다! 이제 드디어 Flink를 이용해서 스트리밍 시스템을 코딩해볼 수 있는 단계까지 오셨습니다. 다음으로는, 아주 간단한 예시와 무엇부터 공부하시면 좋을지 소개하고 글을 마무리하겠습니다.
Table API vs DataStream API
Flink는 크게 Table API와 DataStream API를 사용해서 코딩할 수 있는데요. DataStream이 조금 더 저수준이고 TableAPI가 고수준의 인터페이스를 제공합니다. 즉, 처음 시작하시는 분들은 TableAPI만 쓰고도 왠만한 요구사항은 전부 커버할 수 있구요. 조금 더 딥하게 봐야겠다 하면 그 때 DataStream API를 공부해도 무방합니다.
TableAPI는 스트리밍 SQL이라고 하는 특수한 SQL문법을 사용하는데, Apache Calcite 기반이라고 합니다. 아래 코드는 가짜 데이터를 생성하는 출력 스트림을 만들고 그걸 카프카에 저장하는 예제입니다.
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().set('pipeline.jars', known_args.dep)
t_env.get_config().set('pipeline.classpaths', known_args.dep)
# define source table with datagen
t_env.execute_sql("""
CREATE TABLE bank_transfers(
transaction_id BIGINT,
from_account_id BIGINT,
to_account_id BIGINT,
amount DECIMAL(32, 2)
) WITH (
'connector' = 'datagen'
)
""")
# define sink table with kafka connector
t_env.execute_sql("""
CREATE TABLE bank_transfers_sink(
transaction_id BIGINT,
from_account_id BIGINT,
to_account_id BIGINT,
amount DECIMAL(32, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'bank_transfers',
'properties.bootstrap.servers' = 'play-kafka-headless:9092',
'format' = 'json'
)
""")
t_env.sql_query("SELECT * FROM bank_transfers") \
.execute_insert("bank_transfers_sink").wait()
bank_transfers
테이블은 datagen이라는 커넥터를 사용해서 내가 이런 데이터를 만들 거라는 스키마를 선언합니다. 그리고 INSERT INTO A SELECT * FROM B
문법으로 B의 테이블에서 A로 저장한다는 쿼리를 실행합니다.
여기서 커넥터(Connector)는 입출력 데이터 소스를 지정하는 방식을 말하는데요. Flink에서는 Kafka, ElasticSearch, MongoDB, FileSystem, JDBC 등의 커넥터를 기본적으로 지원합니다. 여기서 지원하지 않는 커넥터를 입출력 소스로 쓸려고 하면 매우 슬픈 길로 들어가야 합니다.
마지막으로 이렇게 작성한 코드를 Flink 서버로 업로드 하면 자동으로 태스크가 실행되는 걸 볼 수 있습니다.
flink run -py ../jobs/helloworld/main.py -m localhost:30000
레퍼런스
- [1] A Survey on the Evolution of Stream Processing Systems
- [2] Use Cases | Apache Flink
- [3] The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
- [4] Windows | Apache Flink
- [5] 스트리밍 시스템 | Yes24
- [6] Timely Stream Processing
- [7] Online Event Processing | Achieving consistency where distributed transactions have failed
- [8] Watermarks in Stream Processing Systems: Semantics and Comparative Analysis of Apache Flink and Google Cloud Dataflow
- [9] PyFlink Playground
- [10] Towards a Reliable Device Management Platform | Netflix