--

Spark 스트리밍과 Session Window 소개

  1. 지난 10월 13일 스파크 3.2.0 버전이 릴리즈 되었습니다.

스파크 3.1 이 나온 지 불과 넉달만에 새로운 버전이 출시된 셈인데요. 비록 짧은 기간을 두고 발표된 새 버전이지만 Scala 3.1 지원, Rocks DB 기반 State store 제공, ANSI SQL compliance, Pandas API 지원등 중요하고 흥미있는 변경 사항들을 다수 포함하고 있는 것이 이번 버전의 특징이라고 할 수 있습니다.

이번 글에서는 새로운 스파크 버전에 포함된 다양한 변경 사항중에서 Structured Streaming의 ‘EventTime based sessionization’ 기능에 대해 소개해 보려고 합니다.

2. 스파크와 Session Window

사실 세션 윈도우 자체는 새로운 개념이 아니고 flink나 kafka streams 와 같은 일부 제품에서는 이미 제공하고 있던 기능이기 때문에 익숙한 분들이 많을 것입니다.

Spark 에서 사용하는 세션 윈도우 개념 역시 다른 라이브러리에서 제공하는 것과 크게 다르지 않지만 Structured Streaming의 워터마크(water mark)나 출력모드(output mode)에 따른 제약 사항이 있다는 점에 대해서는 잘 알아둘 필요가 있습니다.

3. Window
시작하기에 앞서 이번 글에서 자주 언급하게될 ‘윈도우(window)’라는 용어에 대해 알아 보도록 하겠습니다.

먼저, SQL에 익숙하신 분들이라면 select sum(col1) over(order by col2 rows between ….) 과 같은 window 함수에 대해 들어보셨을 텐데요. 스트리밍 시스템에서 윈도우라는 용어는 일정한 시간 간격으로 구분한 데이터 집합을 가리키는 용도로 사용되는 경우가 많습니다.

뿐만 아니라 SQL의 window 함수는 다소 특별한 집계 방식이 필요한 상황에서만 주로 사용되는데 반해 스트리밍 시스템에서의 window는 거의 대부분의 집계(aggregation) 연산이 필요한 상황에서 사용된다는 점도 그 차이라고 할 수 있습니다.

4. 이처럼 스트리밍 시스템에서 윈도우 개념이 중요하고 자주 사용되는 이유는 무엇일까요?

여러가지 이유를 들 수 있겠지만 그 중 하나로 꼽을 수 있는 것은 우리가 사용하고 있는 대부분의 데이터 처리 방법이 특정 시점의 데이터 스냅샷, 즉 변하지 않고 크기가 고정된 데이터를 다루는데 적합한 방식이기 때문입니다.

우리가 실시간 처리라고 부르는 방법도 실제로는 수많은 작은 데이터 스냅샷에 대한 처리를 다양한 방식으로 반복하면서 수행하는 경우가 대부분이고요

이런 사례는 (굳이 스트리밍 시스템까지 언급하지 않더라도) 우리 주변에서 쉽게 찾아볼 수 있는데요. 예를 들면 수십 테라바이트에 달하는 대용량 데이터 처리와 같은 경우를 생각해 볼 수 있습니다.

우리가 이미 잘 알고 있듯이 서버 한 대로 처리할 수 없는 대용량 데이터를 처리하는 가장 평범하면서도 확실한 방법은 전체 데이터를 처리 가능한 작은 단위로 나눈 후 여러번의 소규모 배치를 반복 수행해서 각 그룹의 데이터를 처리하고 그 결과들을 다시 하나로 합쳐서 최종 결과물로 만들어 내는 것입니다. (이 과정에서 파티션이니 셔플이니 하는 개념들도 등장하게 됩니다)

이와 같은 처리 방식은 스트리밍 처리의 경우에도 예외가 아니어서 처리하는 데이터 스냅샷의 크기(레코드 한 두개도 스냅샷으로 볼 수도 있습니다)와 이를 처리하기 위해 생성되는 배치 프로세스의 갯수, 각 배치를 처리하는데 소요되는 시간의 차이가 있을 뿐 한번에 처리 불가능한 대량의 데이터를 처리 가능한 소규모 그룹으로 나눠서 반복된 배치 처리를 통해 결과를 생성하는 방식은 여전히 동일하다고 볼 수 있습니다.

5. 하지만 그렇다고 해서 기존 배치처리와 스트리밍 처리 방식이 완전히 동일하다고는 할 수 없습니다.

그 이유는 데이터를 처리하는 시점(프로세싱 타임)과 데이터가 생성된 시점(이벤트 타임)간의 차이를 다루는 방식이 다르기 때문인데요. 일반적인 배치 처리의 경우 이 두 시각을 엄격하게 구분해서 사용하는 경우가 많지 않지만 스트리밍 처리의 경우는 그 차이를 꼼꼼히 따져봐야 하는 상황이 자주 발생하기 때문입니다.

대부분의 배치 처리에서 집계 기준으로 사용하는 시각은 데이터 자체에 기록되어 있는 시각, 즉 이벤트 타임(event time)입니다. 일반적인 배치 처리가 다루는 데이터는 대부분 이미 어딘가에 저장된, 변하지 않는 데이터를 처리하기 때문에 데이터를 언제 집계하는가는 결과에 영향을 주지 않게 됩니다.

예를 들어 날짜별 클릭수 집계 결과를 얘기할때 “11월 17일 시간별 클릭수”라고 하지 “12월 25일 13시 5분 10초에 집계한 11월 17일 시간별 클릭수”라고 언급하지 않는것과 같습니다.

6. 하지만 스트리밍 처리의 경우는 상황이 달라질 수 있습니다.

그 이유는 우리가 짧은 주기의 배치(보통 마이크로 배치라고 부릅니다)를 반복 실행시켜서 매 시점마다의 데이터 집합(스냅샷)을 처리한다고 해도 이는 어디까지나 프로세싱 시각을 기준으로 하는 데이터 스냅샷을 처리한 것일 뿐이며 이벤트 시각 관점에서 봤을때는 여전히 변경중인 데이터를 임의의 한 시점에 처리한 셈이 되기 때문입니다.

7. 예를 들어 온라인 광고 클릭 정보를 실시간으로 수집하고 가공해서 리포트를 발행해주는 시스템이 있다고 가정해 보겠습니다.

A라는 사용자가 오후 1시 5분에 핸드폰으로 어떤 광고를 클릭 했습니다. 하지만 네트워크에 문제가 생기는 바람에 해당 이벤트 정보가 로그를 기록하는 서버에 전달되지 못했고 이로 인해 약 2시간이 지난 오후 3시 5분이 되어서야 로그에 기록되는 상황이 일어났습니다.

이때 만약 우리의 스트리밍 시스템이 실제로 클릭을 한 시각을 기준으로 ‘13:00 ~ 14:00 시 동안의 클릭수’ 결과를 집계해서 출력하고 있었다면 어떤 일이 일어날까요?

해당 시스템이 제공하는 클릭수는 늦게 도착한 A 사용자의 클릭 정보로 인해 3시 5분 이전에 조회한 결과와 3시 5분 이후에 조회한 결과가 달라지는 현상이 나타나게 될 것입니다.

통합 리포트 서비스에서 여러 개의 리포트가 각각 다른 주기에 따라 스트리밍 시스템과 통신하여 결과를 낸다면, 사용자의 클릭 정보를 3시 5분 이전에 조회 했는지, 그 이후에 조회 했는지에 따라 리포트 수치들이 달라지는 문제가 발생할 수 있습니다.

이런 이유로 스트리밍 방식으로 데이터를 처리할때는 이벤트 타임과 프로세싱 타임간의 차이에 의해 발생 가능한 여러 이슈들을 주의 깊게 다룰 필요가 있습니다.

8. 그렇다면 이런 상황에서는 어떻게 처리를 하는 것이 좋을까요?

이벤트 타임과 프로세싱 타임이 달라지는 것은 어떤 면에서 보면 매우 자연스러운 현상이고 이에 대한 최적의 방법은 보는 관점에 따라 달라질 수 있습니다.

결국 우리가 할 수 있고 또 해야 할 일은 주어진 업무와 상황에 가장 적합한 방법을 잘 찾아서 선택하는 것일것 같습니다.

그럼 선택 가능한 방법에는 어떤것들이 있을까요?

9. 우선 생각해 볼 수 있는 첫번째 방법으로는 클릭수 정보가 항상 변경 가능하다는 것을 인정하고 전체 시스템을 구성하는 방법입니다.

즉 스트리밍 시스템 자체는 매번 다른 결과(클릭수)를 출력하고 이로 인해 발생 가능한 이슈는 그 데이터를 읽어가서 사용하는 각 클라이언트 시스템들이 알아서 조치하도록 하는 것입니다.

이렇게 할 경우 데이터 자체는 왜곡 없이 전달할 수 있다는 장점이 있지만 스트리밍 시스템의 출력 결과를 사용하는 시스템 입장에서는 매번 변경 될수도 있는 데이터에 대한 별도 기능 구현이 필요하게 됩니다.

일단 위 아이디어를 전제로 시스템을 구성 한다고 할때 스트리밍 시스템 입장에서는 아래 3가지 방법중 하나를 선택할 수 있을것 같습니다.

10. 첫번째는 모든 데이터를 프로세싱 타임 기준으로 처리하는 것입니다.

이는 가장 직관적인 방법으로 이벤트 타임에 대한 처리는 데이터를 사용하는쪽에 맡겨두고 스트리밍 시스템은 매번 새로 들어오는 데이터만 가공해서 결과를 내는 것입니다

예를 들어 10초마다 짧은 배치를 수행하여 1시간 간격의 클릭수를 집계하는 A 시스템이 있다고 가정해 보겠습니다.

15:00:00 에 수행된 배치가 읽어들인 데이터에 오후 2시 클릭이 10건, 오후 3시 클릭이 5건 포함되어 있을 경우 해당 배치의 출력 결과는 14시 10건, 15시 5건이 됩니다.

다시 10초 후인 15:00:10 에 수행된 배치가 읽어 들인 데이터에 오후 2시 클릭이 10건 들어올 경우 해당 배치의 출력 결과는 14시 10건이 됩니다.

따라서 3시 30분 배치 결과와 3시 40분 배치 결과를 합쳐서 최종 합계(00시 00건, 01시 00건… 14시 10건, 15시 5건…)를 만드는 것은 이 스트리밍 시스템을 사용하는 다른 시스템측에서 수행해야 합니다

스파크를 사용해 보신 분들은 스파크가 Spark Streaming과 Structured Streaming 이라는 두 가지 라이브러리를 제공하는 것을 알고 있을텐데요, 이중 Spark Streaming의 처리 방식이 바로 이와 같은 방식이라고 할 수 있습니다.

11. 두번째는 새로운 데이터가 들어올때마다 전체 결과를 모두 다시 계산해서 출력해주는 방식입니다.

위에서 말한 A 시스템에 이 방식을 적용한다면 3시 30분 배치의 결과는 00시 00건, 01시 00건, …, 14시 10건, 15시 5건 으로 3시 40분 결과는 00시 00건, 01시 00건… 14시 20건, 15시 5건… 과 같이 출력됩니다

첫 번째 방식의 경우 2시 데이터가 3시에 들어오게 되면 이전 결과를 찾아서 늦게 들어온 데이터 합계를 이전 합계에 더해주는 과정이 따로 필요 하지만 이 방식을 사용하게 되면 단순히 기존 결과를 모두 지워버리고 새로운 결과로 대체해 넣으면 되기 때문에 스트리밍 시스템을 사용하는 클라이언트 시스템 입장에서는 작업이 조금 더 단순해 질수 있다는 장점을 가질 수 있습니다. (이 방식은 Spark Structured Streaming 을 통해 제공되며 출력 모드를 Complete mode 로 설정하면 됩니다)

12. 마지막은 새로운 데이터가 들어올때마다 전체 결과를 모두 출력해 주는 대신 변경된 부분만 출력해 주는 방식입니다.

위에서 말한 A 시스템에 이 방식을 적용한다면 3시 30분 배치의 결과는 14시 10건, 15시 5건 으로 3시 40분 결과는 14시 20건과 같이 출력됩니다

클라이언트 입장에서는 여전히 업데이트 작업이 필요하지만 스트리밍 시스템 입장에서는 내부에 유지하고 있어야 하는 상태값(데이터의 건수)이 줄어들기 때문에 더 효율적인 방식이라고 할 수 있습니다.
(이 방식은 Spark Structured Streaming을 통해 제공되며 Update mode 라고 불립니다)

13. 하지만 어떤 방식이든 한번 출력된 결과가 변경되는 것 자체가 마음에 들지 않는다면 클릭수 집계를 최종 결과가 나올 때까지 미뤄두는 것도 하나의 대안이 될 수 있습니다.

위 예제의 경우라면 13:00 ~ 14:00 구간의 클릭수를 하루 또는 이틀이 지난 후에 출력해 줌으로써 뒤늦게 도착하는 데이터의 영향을 최소화 하는 방법입니다.

하지만 이 방법을 사용할 경우 데이터가 나올때까지 매우 오랜 시간을 기다려야 하는 데다가 아무리 오랜 시간동안 기다린다고 해도 그 후에 과거 데이터가 더 이상 안 들어온다는 보장을 하기도 쉽지 않기 때문에 바로 적용하기는 어렵습니다.

때문에 ‘얼마동안 기다릴 것인가?’를 정하는 방법이 필요하게 되며 대부분이 스트리밍 시스템은 ‘워터마크(water mark)’라는 개념을 이용하여 이런 유형의 정보를 설정할 수 있도록 지원하고 있습니다.

(이 방식은 Spark Structured Streaming을 통해 제공되며 Append mode 라고 불립니다. Append 모드를 단순히 ‘출력 결과가 더해지는 모드’라고 이해하기 보다는 ‘한번 출력된 결과가 더 이상 변경되지 않음을 보장 하는 모드’로 기억해 두시는 것이 좋습니다. 단, 현재 최신 버전인 Spark3.2.0 까지는 water mark와 근사한 차이로 도착한 데이터의 경우는 최종 결과에 포함될 가능성이 있으므로(즉 100% 보장되는 것이 아니므로) 이 부분을 주의해야 합니다)

(append 모드와 함께 워터마크를 소개했지만 워터마크 자체가 append 모드에서만 사용되는 것은 아닙니다. 각 출력모드와 워터마크의 조합은 아래 14번 항목의 도표를 참고해 보시기 바랍니다)

14. 이처럼 간단한 상황만 생각해 봐도 시간과 관련된 복잡한 이슈가 있다는 것을 쉽게 알 수 있습니다. (실제로는 이 글에서는 언급하지 않은 성능 및 안정성, 상태 관리, 모니터링, 소스나 싱크 타입 별 이슈등 고민해야 할 주제가 많습니다)

때문에 대부분의 스트리밍 데이터 시스템들은 이런 이슈에 대응하기 위한 적절한 대응 방법을 사용하고 있고 이를 위해 자주 활용되는 대표적인 개념이 바로 앞서 소개한 window 입니다 (놀라실지 모르겠지만 사실 지금까지의 긴 설명은 모두 윈도우에 대한 이 한 줄의 설명을 위한 것이었습니다…)

15. 앞에서도 설명했지만 spark는 이벤트 타임에 기반한 윈도우(window)와 워터마크(water mark), 출력모드(output mode) 개념을 제공하고 있고 이들의 조합을 통해 앞서 언급한 이슈들에 대한 해결책을 사용자가 원하는 대로 설정할 수 있도록 지원하고 있습니다.

(아래 그림은 현재 버전(Spark 3.2.0) 기준으로 쿼리 유형과 워터마크, 출력 모드를 어떻게 조합할 수 있는지에 대한 설명입니다. 앞에서는 대표적인 케이스만 설명했지만 실제로는 더 다양한 경우가 많기 때문에 다소 복잡해 보이는데요. 무조건 암기하거나 표만 참고해서 사용하기 보다는 위에 설명한 내용을 토대로 그 이유를 찬찬히 생각해 보면 왜 어떤 조건들은 조합이 되고 어떤 조합들은 안되는지 이해하실 수 있을 것입니다)

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

이후로 소개할 session window 역시 다른 window와 동일하게 워터마크(water mark), 출력모드(output mode)와 조합하여 사용할 수 있습니다.

16. 그럼 지금부터 예제를 통해 스파크가 제공하는 윈도우 동작을 확인해 보겠습니다. 먼저 테스트를 위한 간단한 코드입니다.

(아래 그림은 스파크 공식 문서에서 제공하는 window 연산 관련 그림입니다. 이 글의 예제 코드와 세부적인 내용은 다르지만 앞으로 다루고자 하는 내용과 같은 개념을 알기 쉽게 설명하고 있으므로 함께 참고해 보시기 바랍니다)

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time

17. 예제에서 사용하는 코드는 스파크가 제공하는 socket 소스를 사용합니다.

이를 위해서 다음과 같이 간단한 서버를 먼저 실행한 뒤 위 예제 코드를 실행합니다 (소켓 소스의 사용법은 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources 을 참고하시기 바랍니다)

nc -lk 9999

18. 서버가 준비되면 아래와 같은 데이터를 순서대로 입력하고 출력을 살펴봅니다

{“prod”:”note”, “price”: 2000, “ts”:”2021–11–14 20:53:08"}

+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|window |prod|sum |
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|{2021–11–14 20:50:00, 2021–11–14 20:55:00}|note|2000|
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +

예제 코드에서 window(col(“ts”), “5 minutes”, “5 minutes”) 부분은 이벤트 타임을 나타내는 “ts” 컬럼의 값을 기준으로 매 5분 간격(세번째 파라미터)으로 5분 길이의 윈도우(두번째 파라미터)를 설정하라는 의미입니다.

코드를 실행하고 결과를 살펴 보면 실제로 이벤트 타임(데이터 자체에 기록된 시각)을 기준으로 window가 설정된 것을 확인할 수 있습니다.

이번에는 윈도우를 window(col(“ts”), “10 minutes”, “5 minutes”) 으로 변경하고 같은 테스트를 수행해 보겠습니다

+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|window |prod|sum |
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|{2021–11–14 20:45:00, 2021–11–14 20:55:00}|note|2000|
|{2021–11–14 20:50:00, 2021–11–14 21:00:00}|note|2000|
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +

결과를 보면 5분 간격으로 윈도우를 설정하되 그 길이가 10분으로 늘어난 것을 볼 수 있습니다.

우리가 입력한 데이터의 이벤트 타임은 20시 53분이기 때문에 45분~55분, 50분~00분 사이의 두 개 윈도우에 모두 속한 것을 알수 있습니다.

물론 ts 라는 컬럼 값을 이용해 직접 윈도우 컬럼을 추가하고 이 컬럼을 기준으로 groupBy를 수행할 수도 있습니다. 하지만 일반적인 텀블링 윈도우나 슬라이딩 윈도우까지 이런 방식으로 설정하기에는 귀찮은 연산이 더 많이 필요할 수 있어서 제공되는 함수를 사용하는 것을 추천합니다.

(방금 설명한 윈도우 설정 방법과 텀블링 윈도우, 슬라이딩 윈도우에 대한 자세한 내용은 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time 에서 확인하실 수 있습니다)

19. 기본적인 테스트를 마쳤으니 이번에는 편의상 윈도우를 처음처럼 5분 마다 5분 간격으로 되돌려 두고 데이터를 몇개 더 넣어 보도록 하겠습니다.

{“prod”:”note”, “price”: 2000, “ts”:”2021–11–14 20:53:08"}{“prod”:”note”, “price”: 2000, “ts”:”2021–11–14 20:53:08"}

아래 결과는 위와 같이 동일한 데이터를 두번 연속 입력하고 난 후의 결과입니다. 출력된 결과를 보면 알수 있듯이 이전 데이터와 두번째 데이터의 금액이 합쳐진 결과가 출력된 것을 알 수 있습니다

+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|window |prod|sum |
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|{2021–11–14 20:50:00, 2021–11–14 20:55:00}|note|4000|
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +

즉 이런 방식으로 데이터를 처리하게 되면 동일 윈도우, 동일 상품(prod)에 속하는 데이터가 들어올때마다 매번 새로 갱신된 데이터가 출력될것을 예상할 수 있습니다.

Spark에서는 이런 방식을 update 모드라고 하며 이는 앞에서 얘기한 “출력 결과가 항상 변할 수 있음을 가정하고 시스템을 구성하는 방식”을 위한 것이라고 할 수 있습니다.

20. 하지만 앞에서 언급했듯이 매번 출력 결과가 달라지는 이런 방식을 원치 않을 수 있습니다.

따라서 두번째 방법인 “데이터가 다 올때까지 결과를 출력하지 않고 기다리는 것” 을 선택할 수 있는데요 문제는 얼마 동안 기다려야 할지를 정하는 것인데 이는 기존 코드의 일부를 다음과 같이 수정해서 처리할 수 있습니다.

이제 위와 같이 코드를 변경하고 실행 한 후 데이터를 입력해 보겠습니다

{“prod”:”note”, “price”: 2000, “ts”:”2021–11–14 20:53:08"}

이번에는 아까와 다르게 데이터를 입력해도 아무런 결과가 나오지 않습니다.

+ — — — + — — + — -+
|window|prod|sum|
+ — — — + — — + — -+
+ — — — + — — + — -+

대신 콘솔에 아래와 같이 워터마크 값이 변경된것을 확인할 수 있습니다.

{watermark=2021–11–14T20:43:08.000Z}

위 결과는 첫번째 데이터 입력으로 워터마크 값이 20시 43분 08초로 설정되었다는 의미입니다. 이게 무슨 의미일까요?

일단 두번째 데이터를 넣어보겠습니다.

{“prod”:”note”, “price”: 2000, “ts”:”2021–11–14 20:59:55"}

결과는 여전히 아무것도 출력되지 않습니다. 하지만 워터마크를 보면 20시 43분이던 시간이 20시 49분으로 변경된것을 볼 수 있습니다.


+ — — — + — — + — -+
|window|prod|sum|
+ — — — + — — + — -+
+ — — — + — — + — -+

{watermark=2021–11–14T20:49:55.000Z}

그럼 이 상태에서 3번째 데이터를 입력해 보겠습니다.

{“prod”:”note”, “price”: 2000, “ts”:”2021–11–14 21:09:05"}

— — — — — — — — — — — — — — — — — — — — — -
Batch: 6
— — — — — — — — — — — — — — — — — — — — — -
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|window |prod|sum |
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
|{2021–11–14 20:50:00, 2021–11–14 20:55:00}|note|2000|
+ — — — — — — — — — — — — — — — — — — — — — + — — + — — +
{watermark=2021–11–14T20:59:05.000Z}

이번에는 기다리던 결과가 나왔습니다. 설정된 윈도우를 보니 20시 50분 ~ 20:55분 사이로 되어 있습니다.

그리고 이 구간에 속하는 데이터는 우리가 맨 처음 입력한 20시 53분 데이터라는 것을 알 수 있습니다.

이런 결과가 나온 이유는 우리가 이번에 채택한 방식이 “데이터가 다 들어올때까지 충분히 기다렸다가 결과를 내어주는 방식”을 선택했기 때문입니다.

워터마크란 “데이터가 다 들어올때까지 얼마나 기다릴 것인지”를 설정하는 것인데 예제처럼 withWatermark(“ts”, “10 minutes”) 라고 지정하게 되면 현재까지 들어온 데이터중 가장 마지막에 들어온 데이터의 이벤트 타임보다 10분 빠른 시각을 워터마크 값으로 설정하라는 뜻이됩니다.

즉 맨처음 입력한 데이터가 속하는 구간인 20:50 ~ 20:55분 사이의 데이터가 확정되는 시점은 워터마크가 20:55분을 넘어서는 시점이 되고 워터마크가 그 시점에 도달한 것이 세번째 데이터를 입력했던 시점 (이때 워터마크가 20시 59분 05초로 설정됩니다)이기 때문에 그 시점에 해당 윈도우의 데이터가 출력된 것입니다.

21. 세션 윈도우

여기까지 읽으시느라 고생 많으셨습니다. 이제 드디어 오늘의 주제인 세션 윈도우를 소개할 순서가 되었습니다.

스트리밍 처리가 처음이신 분들은 지금까지의 내용이 많이 어려우실 수도 있는데요. 사실 위 내용만 잘 이해하면 세션 윈도우 처리 자체는 쉽게 이해하실 수 있을 것입니다

먼저 세션 윈도우 관련 코드를 살펴 보겠습니다.

코드를 보면 알수 있듯이 지금까지 본것과 거의 동일한 코드를 사용하고 있습니다.

달라지는 점이라면 앞에서 window 라고 처리했던 부분이 session_window 라는 내용으로 바뀐 정도가 전부입니다.

변경된 코드의 내용 역시 매우 직관적인데요 session_window($”ts”, when($”age” < 30, “5 minutes”).otherwise(“30 minutes”)) 라고 하면 이벤트 타임을 나타내는 “ts” 라는 컬럼을 기준으로 세션 윈도우를 만들되 age 컬럼의 값이 30보다 작으면 5분 윈도우를 적용하고 그렇지 않으면 30분을 적용하라는 의미입니다

여기서 기존 윈도우와 다른 점이 있다면 기존 윈도우는 시간 간격이 5분, 10분 등으로 고정되어 있는 반면에 세션 윈도우는 데이터가 들어올때마다 코드에 지정한 시간 만큼 윈도우를 증가시켜 준다는 차이가 있습니다.

session_window($”ts”, “5 minutes”) 라고 하면 첫번째 데이터가 들어오면 5분짜리 윈도우를 설정했다가 두번째 데이터가 들어오면 그 데이터의 이벤트 시각으로 부터 또 5분을 설정하고 이런식으로 윈도우를 증가시켜가다가 더 이상 데이터가 들어오지 않으면 세션을 닫게 되는 (윈도우 크기를 더 이상 증가시키지 않는) 방식으로 처리를 수행하는 것입니다.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

그외 water mark와 출력모드(update, append) 특성은 앞에서 살펴본 것과 동일하게 작용하게 되기 때문에 세션 윈도우로 윈도우의 동적 크기를 설정하고 출력모드와 그에 따른 워터 마크 설정으로 원하는 처리 방식을 결정한다고 이해하면 됩니다.

그럼 실제 데이터를 입력해서 어떤 식으로 데이터가 출력되는지 살펴 보겠습니다.

먼저 아래와 같이 3번의 데이터를 연속 입력합니다.

{“id”:”user2", “age”:26, “ts”:”2021–11–14 20:05:00"}
{“id”:”user2", “age”:26, “ts”:”2021–11–14 20:08:00"}
{“id”:”user2", “age”:26, “ts”:”2021–11–14 20:11:00"}

그 결과 세션 윈도우는 마지막 데이터 입력 시점이 20:11분으로 부터 5분 후인 20:16분으로 설정되었겠지만 워터마크가 20시 9분으로 설정되어 있기 때문에(20시 9분 이후의 데이터는 여전히 들어올수 있다는 의미) 아무런 데이터도 출력되지 않고 있는 상태가 됩니다.

+ — — — — — — — + — -+ — — -+
|session_window|id |count|
+ — — — — — — — + — -+ — — -+
+ — — — — — — — + — -+ — — -+
{watermark=2021–11–14T20:09:00.000Z}

이제 이 데이터가 출력될 수 있도록 아래와 같은 데이터를 입력해 보겠습니다.

{“id”:”user2", “age”:26, “ts”:”2021–11–14 20:21:00"}

위 데이터의 입력으로 워터마크가 20시 19분으로 설정되었고 이는 20시 19분 이전의 데이터는 출력해도 된다는 의미가 되므로 아래와 같이 20:16분을 끝으로 하는 세션 윈도우 결과가 출력된 것을 확인 할 수 있습니다

+ — — — — — — — — — — — — — — — — — — — — — + — — -+ — — -+
|session_window |id |count|
+ — — — — — — — — — — — — — — — — — — — — — + — — -+ — — -+
|{2021–11–14 20:05:00, 2021–11–14 20:16:00}|user2|3 |
+ — — — — — — — — — — — — — — — — — — — — — + — — -+ — — -+

{watermark=2021–11–14T20:19:00.000Z}

22. 지금까지 스파크에 새로 추가된 세션 윈도우에 대해 알아보았습니다.

아마도 스파크나 스트리밍 처리의 경험이 많지 않으신 분들은 세션 윈도우를 설명한다고 하면서 왜 워터마크나 출력 모드 얘기를 꺼내서 더 혼란스럽게 하느냐고 생각하실 수도 있을것 같은데요, 현재 버전의 세션 윈도우를 제대로 사용하기 위해서는 워터마크와 update를 제외한 출력 모드의 사용이 필수적이기 때문에 어쩔수 없이 함께 다루게 되었습니다. (현재 버전의 세션 위도우는 업데이트 모드를 지원하지 않으며 groupBy 절에는 session_window 컬럼외에 적어도 하나의 컬럼이 더 지정되어야 하는등의 제약이 존재합니다)

23. 짧은 글을 통해 소개하다보니 내용도 부족하고 많은 부분을 생략하는 바람에 궁금한 점도 많으실것 같습니다.

사실 이글에서 언급한 스트리밍 시스템의 많은 기존 이슈들은 최근에 주목을 끌고 있는 delta lake(https://delta.io/) 기술을 접목하면 전혀 다른 접근 방식을 통해 문제를 해결할 수 있습니다. 비록 이글에서는 다루지 못했지만 관심있는 분들은 한번쯤 참고해 보실것을 추천드립니다 (현재(2011년 11월 18일) Delta lake 는 Spark 3.2.0 을 공식 지원하지 않고 있지만 github issue를 확인해 보면 곧 새 버전이 릴리즈될 예정이라고 합니다)

마지막으로 이 글에서 미처 다루지 못한 내용들을 데이터브릭스의 https://databricks.com/blog/2021/10/12/native-support-of-session-window-in-spark-structured-streaming.html
참고하셔서 함께 살펴보시면 좋을듯 하여 함께 소개해드립니다

감사합니다.

--

--