스트림 데이터를 SQL로 다룰수 있는 Flink 살펴보기

Kafka가 대중화되면서 스트림데이터를 다룰 수 있게 되었지만, 스트림데이터를 다양한 차원과 조건으로 집계 하는 건 쉬운 일이 아닙니다.

오늘은 저희 조직에서 Kafka에 저장된 광고로그를 Flink 이용해서 SQL 기반으로 쉽고 빠르게 만들어낸 경험을 소개하려고 합니다.

들어가기전에

사실 우리 팀에서 먼저 도입한 건 Kafka Streams 였습니다. 라이브러리 기반이라 가볍고, 심플하며 훌륭했습니다. 그래서 Kotlin DSL 기반으로 표현하도록 사용성을 개선해서 사용했지만 요구사항은 빠르게 늘어갔고,

KafkaStreams 를 쓰다보면 Schema Registry 와 Kafka Connect 는 사실상 필수

시스템은 커지면서 스키마를 다루기 위해서는 Schema Registry 가 사실상 필수였고, 다양한 스토리지에 저장하기 위해서는 Kafka Connector 를 사용하면서 시스템이 복잡해지는 상황이었습니다.

그래서 오픈소스 중에 SQL 형태로 표현이 가능하고, 다양한 스토리지의 Sink를 지원하는 솔루션 중에서 Flink 가 가장 적합하다고 판단해서 사용해 보았습니다.

Flink 의 도입

Flink 는 스트림 처리를 위한 플랫폼입니다. 다양한 스토리지는 Connectors라는 개념으로 라이브러리를 추가하면 연결 가능하고, SQL Client 를 사용하면 쿼리를 이용해 데이터를 다룰 수 있습니다. (기본은 DataStream API, Table API 를 사용하는 것이 기본이지만 SQL 만으로도 대부분 표현이 가능합니다)

스트림데이터 SQL 로 다루기

이 글에서는 얼마나 쉽게 데이터를 다룰 수 있는지에 이야기하고 싶기 때문에 간단한 예시를 들어 설명하고자 합니다. 스트림 데이터를 분석할 때는 윈도우 단위로 집계하고 그 데이터를 저장하는 형태로 많이 사용하기 때문에 쿼리로 쉽게 표현이 어떻게 되는지 알려드리고자 합니다.

데이터 SINK 하기 (Kafka to Elasticsearch)

Flink는 Connector 라이브러리를 lib 폴더에 미리 복사해두면 바로 테이블로 맵핑해서 데이터를 다룰 수 있습니다.

예를 들어, Kafka의 스트림 데이터를 분석해보기 위해서 Elasticsearch로 데이터를 옮겨야 한다면 다음과 같이 테이블을 선언하면 손쉽게 데이터를 복사할 수 있습니다.

----------------------------------
-- SOURCE TABLE:
-- KAFKA + AVRO + SCHEMA-REGISTRY
----------------------------------
CREATE TABLE IF NOT EXISTS KafkaInputLog(
the_kafka_key STRING,
logTime TIMESTAMP(0),
eventType STRING,
custId STRING,
adId STRING,
mediaId STRING,
schKwd STRING,
WATERMARK FOR logTime AS logTime - INTERVAL '5' MINUTE
) WITH (
'connector' = 'kafka',
'topic' = 'ad-log-data',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'test.kafka.domain.com:9092',
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://test.schema.domain.com:8081',
'value.avro-confluent.schema-registry.subject' = 'subject-ad-log-data',
'value.fields-include' = 'EXCEPT_KEY'
);
----------------------------------
-- SINK TABLE
----------------------------------
CREATE TABLE IF NOT EXISTS EsInputLog(
logTime STRING,
eventType STRING,
custId STRING,
adId STRING,
mediaId STRING,
schKwd STRING
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://test.es.domain.com:10200',
'index' = 'ad-log-data-index'
);
----------------------------------
-- KafkaInputLog -> EsInputLog
----------------------------------
INSERT INTO EsInputLog
SELECT
DATE_FORMAT(logTime, 'yyyyMMdd HH:mm:ss') AS logTime,
eventType,
custId,
adId,
mediaId,
schKwd
FROM
KafkaInputLog
WHERE
eventType in ('AD-VIEW', 'AD-CLICK')
;

여기서 놀라운 건 INSERT INTO의 SELECT 절에서 WHERE 조건을 추가하거나 함수를 써서 데이터를 가공하거나 필터링을 할 수 있다는 점입니다.

스트림 데이터는 무한의 데이터를 다루기 때문에 종료되지 않는 특성이 있기 때문에, 이 쿼리는 JOB으로 등록되고 대시보드상에서 진행상황을 확인할 수 있게 됩니다.

Flink Jobs DashBoard

Windowing Aggregate 표현

시간은 무한히 흐르지만 하루를 24시간이라는 개념으로 나누듯

스트림 데이터도 무한히 흐르지만 분석을 위해서 어떤 단위로 나눠서 분석하는 것이 일반적입니다. 그 어떤 단위를 “윈도우”라고 부르고 어떻게 형태에 따라 윈도우는 다양한 종류가 존재합니다.

텀블링윈도우

쉽게 생각해서 1시간 배치를 만든다고 할때 00:00~01:00 / 01:00~02:00 와 같이 동일한 윈도우크기(1시간) 에 시간대가 안겹쳐지는 형태를 의미합니다.

텀블링 윈도우의 표현

겹쳐지는 데이터가 없고, 배치에서 사용하는 파티션 전략과 유사하기 때문에 가장 일반적으로 많이 쓰이는 윈도우 종류라고 생각하시면 됩니다.

flink 에서는 다음과 같이 텀블링 윈도우를 쿼리로 표현하고 요약할 수 있습니다.

INSERT INTO esTumbleCust
SELECT
custId,
mediaId
SUM(IF(eventType = 'AD-VIEW', 1, 0)) AS viewCnt,
SUM(IF(eventType = 'AD-CLICK', 1, 0)) AS clkCnt,
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ymd,
DATE_FORMAT(window_start, 'HHmm') AS hh24
FROM TABLE(
TUMBLE(
DATA => TABLE KafkaInputLog,
TIMECOL => DESCRIPTOR(logTime),
SIZE => INTERVAL '1' HOUR))
group by
window_start, window_end,-- 일종의 예약어 필드
custId, mediaId;

호핑윈도우 (슬라이드윈도우)

윈도우 크기는 동일하지만 이동하는 간격시간이 다른 형태를 의미합니다. 창문을 밀어내는 모습과 비슷하다고해서 “슬라이드 윈도우” 라고 더 많이 알려져있습니다.

윈도우간의 데이터가 겹쳐지기 때문에 최근 1시간 데이터의 최근 데이터를 보고싶은 패턴에서 주로 사용하게 됩니다

호핑윈도우(슬라이드윈도우) 는 창문이 밀리는것처럼 일정간격으로 윈도우를 이동하는 형태이다

Flink 에서 쿼리로 표현할때 “SLIDE” 파라미터가 추가로 있는데 이 값이 “SIZE” 와 같다면 텀블링 윈도우랑 같은 결과를 만들어 냅니다.

INSERT INTO esHopCust
SELECT
custId,
mediaId
SUM(IF(eventType = 'AD-VIEW', 1, 0)) AS viewCnt,
SUM(IF(eventType = 'AD-CLICK', 1, 0)) AS clkCnt,
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ymd,
DATE_FORMAT(window_start, 'HHmm') AS hh24
FROM TABLE(
HOP(
DATA => TABLE KafkaInputLog,
TIMECOL => DESCRIPTOR(logTime),
SLIDE => INTERVAL '1' MINUTES, -- 윈도우가 옮겨지는 간격시간
SIZE => INTERVAL '1' HOUR))
group by
window_start, window_end,-- 일종의 예약어 필드
custId, mediaId;

마무리

과거에는 “빅데이터”를 어떻게 다룰것인가? 이런 문제로 고민하던 시절이 있었습니다. 이 난제는 하둡과 스파크 같은 <분산처리 배치플랫폼>이 나오면서 해결되었습니다. 그리고 Hive 와 Presto 가 같이 쿼리기반으로 더 쉽게 처리가능해 지면서 “빅데이터” 라는 단어는 이제는 촌스러운 과거의 기술단어가 되버린지 오래입니다.

“스트림 데이터”를 어떻게 다룰것인가?

이 문제는 Kafka 가 나오면서 기반을 만들어 놨다고 이야기 할 수 있습니다. 하지만 쉽게 다룰수 있는가? 에 대한 질문에는 대해서는 아직 “예” 라고 대답하기는 어려운 수준이 아닌가 싶습니다.

역사는 반복되는것 처럼 스트림데이터도 쿼리기반으로 쉽게 처리해야 할 수 있어야 더 대중화 될수 있고, Flink 는 훌륭한 도구가 될 수 있다고 이야기 드릴수 있겠습니다.

--

--