AWS EMR과 Airflow를 이용한 Batch Data Processing

Class101 데이터팀에서 Spark를 이용해 빅데이터를 ETL하는 법

Min Jo
18 min readAug 5, 2020

TL;DR

  • Apache Airflow (Workflow Scheduling)
  • Amazon EMR (Managed Spark Cluster)
  • PySpark 2.4.4 (Distributed Data Processing)
  • AWS Glue (Hive Metastore)
  • AWS S3 (Parquet File Object Storage)
  • Amazon Redshift (Data Warehouse)

를 사용하여 Class101의 빅데이터 ETL 플랫폼을 구축 😋

배경

클래스 101은 컨텐츠를 중심으로 사업을 다각화하는 조직이다. 클래스101의 데이터팀은 컨텐츠와 사용자간의 관계를 정확하게 분석하고 대응하기 위하여 초기에 RDB와 Pandas를 중심으로 데이터를 축적/처리하기 시작했다. 하지만 점차 늘어나는 유저 트래픽, 컨텐츠의 수, 마케팅 빈도 그리고 Analyst의 분석 수요로인해 데이터의 양이 많아짐에 따라 freshness가 지연되고 분석환경 과부하에 따라 데이터 사용성도 낮아지기 시작했다. 때문에 기존과는 다른 구조가 필요했다.

목표

  • 하루에 GB단위로 증가하는 데이터를 더욱 효율적이며 빠르게 처리하기 위해 분산처리 프레임워크 (Spark)를 도입한다.
  • 처리된 데이터를 SQL로 집계하여 사용성을 극대화하기 위해 MPP Database (Redshift)를 도입한다.
  • 전체 데이터 파이프라인의 동작을 관리하기 위하여 워크플로 관리툴 (Airflow)을 도입한다.

설계 및 구현

https://cdn-images-1.medium.com/max/1600/1*xfeHWFTFZEV_HKMCoqdaMA.png
Airflow <> EMR <> S3 Diagram

우리는 많은 POC를 통해 이 포스트의 맨 위에 명시되있는 스택들을 선택하여 배치 데이터 파이프라인과 웨어하우스를 구축했다. 우리의 선택기준은 비용과 사용성이다.

Airflow

우리는 Airflow를 Stable의 Helm Chart를 이용하여 Kubernetes에 배포함으로써 High Availability를 극대화함과 동시에 손쉬운 유지보수 환경을 구축할 수 있었다.

어떻게 Airflow에서 EMR로 PySpark job을 submit할까?

Airflow에선 다양한 Operator를 입맛대로 구현하여 원하는 Task를 수행할 수 있다. 우리는 EMR Cluster에 PySpark job을 submit하기 위하여 EMRSSHOperator라는 Custom Operator를 구축했다. EMRSSHOperator

  1. 이미 실행되고 있는 AWS EMR cluster 마스터 노드에 SSH 접속을 한 후
  2. 직접 shell에서 spark-submit커맨드를 실행한다. 이때 실행될 커맨드에 parameter로 PySpark job 이름과 그 job의 application code가 들어있는 Docker Image를 같이 제공한다.
  3. PySpark job이 완료되면 Airflow Task역시 완료되며 종료된다.

대략적으로 SSHOperator를 inherit하여 아래형식으로 구현하였다.

class EMRSSHOperator(ssh_operator.SSHOperator):
"""Operator that inherits SSHOperator to run spark-submit command on remote EMR cluster
"""

@decorators.apply_defaults
def __init__(
self,
spark_program_args: str,
driver_cores: str = "1",
driver_memory: str = "2G",
executor_cores: str = "1",
executor_memory: str = "3G",
spark_confs: list = [],
extra_spark_conf_overrides: typing.Optional[dict] = None,
image: str = None,
*args: typing.Any,
**kwargs: typing.Any,
) -> None:
self.driver_cores = driver_cores
self.driver_memory = driver_memory
self.executor_cores = executor_cores
self.executor_memory = executor_memory
self.image = image
self.spark_program_args = spark_program_args
super().__init__(
ssh_conn_id=f"class101_emr_conn_ssh", command="", *args, **kwargs,
)

def execute(self, context: typing.Dict[str, typing.Any]) -> None:
self.command = f"spark-submit \\
--master yarn \\
--deploy-mode cluster \\
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={self.image} \\
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={self.image} \\
--executor-memory {self.executor_memory} \\
--executor-cores {self.executor_cores} \\
--driver-memory {self.driver_memory} \\
--driver-cores {self.driver_cores} \\
{self.spark_confs} \\
run_task.py \\
{self.spark_program_args}"
super().execute(context)
self.cleanup()

위에서 “이미 실행되고 있는 EMR Cluster”에 PySpark job을 submit한다고 명시했다. 어째서 EMR Cluster가 Airflow Task가 시작되기 전에 이미 실행되고 있을까? 그럼 비용낭비가 아닐까? 오히려 비용최적화를 위해 여러 job들을 한 Spark 클러스터에서 수행하기 위함이다! 아래에서 좀 더 자세하게 알아보자.

EMR

비용 최적화와 Reliability의 극대화를 위해 Managed Spark Cluster (관리형 spark 클러스터) 솔루션을 사용했다. 여러 관리형 spark 클러스터 솔루션이 존재하지만 우린 Amazon EMR을 사용하기로 한다. 왜?

  1. Class101 개발팀의 Cloud스택이 AWS이다.
  2. 비용면에서 여러 솔루션 (Databricks, Qubole, DataProc,…)중 가장 저렴하다. (저렴한 만큼 설정에 손이 더 간다. 다른 솔루션들이 비싼 이유는 이 설정들을 유저대신, 혹은 유저에게 더 직관적으로 제공해주기 때문이다)

우리가 설계에서 제일 중요하게 고려해야할 점은 단 한가지, 각 Spark job마다 클러스터를 하나씩 만들고 내릴지, 아니면 한 24/7 standby 클러스터에서 여러 Spark job들을 실행할 지이다. 결론적으로 비용측면을 고려했을 때 한 클러스터에서 여러 잡을 동시에 수행하는게 유리하다.

이렇게 할 수 있도록 도와주는 Spark의 컴포넌트는 바로 클러스터 Resource Manager (클러스터 매니저)이다. 클러스터 매니저는 크게 3가지가 존재한다. Standalone/YARN/Mesos. 대중적으로 제일 많이 쓰는 건 Standalone 클러스터 매니저이다. Default이기도 하며 설정해줄 것이 별로 없어 손쉽게 도입할 수 있다. 허나 Standalone은 리소스 최적화를 크게 고려하지 않는다. Default로 하나의 Spark job을 돌리기 위해 클러스터 전체의 리소스를 사용한다. (이에 대해 저 자세하게 알고 싶다면 구글링)

우리는 여러 Spark job들을 한 클러스터에서 효율적으로 돌리고 싶다. 고로 YARN 클러스터 매니저를 선택한다. Hadoop라이브러리 안에 있는 YARN은 클러스터 리소스를 “컨테이너”단위로 쪼갤 수 있고 이를 각 Spark job들에게 동시에 분배해줄 수 있다.

이와 동시에 어떤 EMR의 feature가 비용 최적화를 도와줄까? 바로 Auto-scaling이다. 클러스터가 아무 Job도 실행하고 있지 않을 때 Worker Node들은 AutoScaling을 통해 0개로 줄어들며 Master Node만 Airflow에서 Schedule될 job들을 기다리며 대기하게 된다. 그러므로, 우리는 Master Node의 고정비용(24/7)을 발생시키지만 결론적으로 한 EMR 클러스터에서 여러 Job을 돌리게 됨으로써 전체적인 비용 측면에서 절약할 수 있다. (물론 Job들의 갯수와 그들의 빈도가 낮다면 당연히 한 task가 마다 클러스터를 올리고 내리는게 더 효율적일 수 있다. 클래스 101은 하루에 약 >100개의 Spark job을 수행한기 때문에 standby설계가 알맞다.) 아래는 위의 설명을 그림으로 표현했다.

https://cdn-images-1.medium.com/max/1600/1*tHCAlrSndIvTLxLsozfmlg.png
https://cdn-images-1.medium.com/max/1600/1*Th33aeZN6dKdtp_pXIu1UQ.png
빨강과 보라는 각각 서로 다른 Spark Job을 의미한다.

주의!

하지만 여기서 고려할 Painpoint가 존재한다. PySpark job은 Python dependency들을 내재하고 있기 때문에 Spark 클러스터에 Python dependency들을 미리 설치해 Python환경을 조성해주어야 한다. 만약 PySpark Job들이 서로 다른 Python library들을 사용한다면 같은 머신 (클러스터)에서 Job들을 실행할 수 없을 것이다. 이를 해결해주기 위해 Amazon EMR이 6.0.0 release에서 Docker Image Support를 발표했다 (참조). 이제 각 PySpark Job들이 같은 클러스터이지만 서로 다른 Docker 컨테이너 위에 실행될 수 있다! Thanks to Amazon EMR

그럼 우리는 어떤 방식으로 PySpark 애플리케이션 코드를 구성했을까?

PySpark Codebase

우리의 Airflow Repo에는 PySpark Job들을 Scheduling 해줄 코드만이 존재할 뿐 PySpark코드를 일체 포함하지 않는다. PySpark 애플리케이션 코드들은 다른 Repo에 구성되어있다. 아래는 그 Repo의 디렉토리 구성이다.

pyspark-etl/
│ README.md
│ task_runner.py
│ Dockerfile

└───etl_codes/
│ │
│ │ users_transform.py
│ │ marketing_transform.py


└───cli_etl_codes/
│ cli_users_transform.py
│ cli_marketing_transform.py

결론적으로, 위의 리포에 정의한 pyspark 코드를 EMR에서 실행시키려면 정의한 method를 CLI커맨드 형태로 실행시킬 수 있어야 한다. 바로 Python의 Click이라는 라이브러리를 활용할 수 있다. Pyspark를 코드를 CLI로 정의하는 방법을 users_transform.py를 예로 들면 아래와 같다.

  1. 먼저 etl_codes/에 method를 정의한다. (users_transform.py)
def users_transform(
spark: SparkSession,
dest_db_prefix: str,
dest_table_name: str,
execution_datetime_tag: Union[date, datetime],
) -> None:
"""
1. Read "raw_users" and "raw_user_infos" tables
2. preprocess "raw_users"
3. preprocess "raw_user_infos"
4. join on "user_id"
5. write "users"
"""
# some pyspark codes

s3_path = os.path.join(
S3_ROOT_PATH,
f"{dest_db_prefix}_{db_suffix}",
f"{dest_table_name}",
f"{execution_datetime_tag.strftime('%Y_%m_%d_%H_%M_%S')}",
)
df.repartition(partition_num).write.mode("overwrite").option(
"path", s3_path
).saveAsTable(f"{dest_db_prefix}_{db_suffix}.{dest_table_name}")

2. cli_etl_codes/에 cli 커맨드를 정의한다. (cli_users_transform.py)

@click.command(name="users_transform")
@click.option("--dest_db_prefix", type=click.STRING, required=True)
@click.option("--dest_table_name", type=click.STRING, required=True)
@click.option(
"--airflow_execution_datetime_tag",
type=click_utils.DateTimeParamType(),
required=True,
help="Airflow Execution Datetime",
)
def cli_users_transform(
dest_db_prefix: str,
dest_table_name: str,
airflow_execution_datetime_tag: Union[date, datetime],
) -> None:
""" Join raw lectures and raw steps table, transform, write lectures
"""
with spark_session(app_name="users_transform") as spark:
users_transform(
spark=spark,
dest_db_prefix=dest_db_prefix,
dest_table_name=dest_table_name,
execution_datetime_tag=airflow_execution_datetime_tag,
)

3. task_runner.pyrun_task라는 main method에 위에 형성한 cli를 더해준다.

from cli_etl_codes import cli_users_transform@click.group()
def run_task():
"""Simple group task that will group all sub-tasks together."""
pass
# pyspark jobs
run_task.add_command(cli_users_transform)

4. Docker Image를 build하여 push해준다.

이제 EMR의 master 노드 shell에서 아래와 같은 커맨드를 실행할 수 있다.

spark-submit                 
--master yarn
--deploy-mode cluster
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=class101/pyspark-etl:latest
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=class101/pyspark-etl:latest
--executor-memory 3G
--executor-cores 1
--driver-memory 2G
--driver-cores 1
run_task.py users_transform --dest_db_prefix='etl_production'
--dest_table_name='users' --execution_datetime_tag='2020-07-19T00:00:00+00:00'

5. Airflow에서 이 PySpark job을 Airflow Task로 정의해준다.

dag = DAG(
"transform-dag",
schedule_interval="30 15 * * *", # every 00:30 AM KST
catchup=False,
max_active_runs=1,
default_args={
**PYSPARK_ETL_DEFAULT_ARGS,
"start_date": datetime(2020, 8, 6),
},
)
users_transform = EMRSSHOperator(
dag=dag,
task_id="users_transform",
image="class101/pyspark-etl:latest",
spark_program_args=shell_command_args(
"users_transform",
dest_db_prefix="etl",
dest_table_name="users",
execution_datetime_tag="{{execution_date}}",
),
)

이렇게 users라는 ETL 결과 테이블을 S3에 write하였다. 이 테이블을 어떻게 SQL을 통해 쿼리할 수 있을까?

Redshift <> Glue <> S3

위의 users_transform.py에서 pyspark dataframe을 write하는 모습을 봤다.

s3_path = os.path.join(
"s3://class101-data-lake",
f"{dest_db_prefix}_{db_suffix}",
f"{dest_table_name}",
f"{execution_datetime_tag.strftime('%Y_%m_%d_%H_%M_%S')}",
)
df.repartition(partition_num).write.mode("overwrite").option(
"path", s3_path
).saveAsTable(f"{dest_db_prefix}_{db_suffix}.{dest_table_name}")

위의 코드를 통해 우리는 users라는 테이블을 s3://class101-data-lake/etl_production/users/2020-08_06_00_00_00/ 에 parquet형태로 write하였다. 동시에 Redshift/Athena에서 쿼리가 가능하도록 이 테이블에 대한 metadata를 Glue에 싱크하고 테이블을 etl_production이라는 external schema에 등록시켜준다. 이는 etl_production이라는 external schema가 Redshift에 존재하고 EMR클러스터가 hive metastore로 Glue Catalog를 사용하다면 Pyspark의 saveAsTableoperation에서 자동으로 이루어진다. 이에 대한 자료는 **여기**서 참조해주기 바란다.

이제 이 테이블은 Redshift에서 etl_production.users의 형태로 존재하게 되어 SQL통하여 여러가지 분석을 할 수 있게 되었다.

이런식으로 pyspark 코드베이스에 원하는 pyspark코드를 계속 추가해줄 수 있다

쓰다보니 길어졌다.

궁금한 사항은 abe@101.inc로 문의해주길 바란다.

linkedin: https://www.linkedin.com/in/mingujo/

클래스 101 데이터팀에서 데이터 엔지니어/데이터 분석가를 모집합니다!

링크: https://www.notion.so/class101/101-29162e4b76564bbe8329da95bf83447a

Next Up

클래스 101 데이터팀이 실시간 유저 이벤트를 트래킹하여 인사이트를 얻는 법!

--

--