데이터분석 인프라 구축기 (3/4)

Spark on YARN Cluster

이번 포스팅에서는 많이(?) 사랑받는 분석도구 중에 하나인 Spark의 설치 및 실행 환경을 다뤄보겠다. 지난번에 구축한 Dockerized Hadoop Cluster를 기반으로 하여 PySpark을 실행하고 간단한 예제를 수행해보겠다.

참고로 Spark은 Scala 로 개발된 만큼 Spark Job의 구현도 Scala 가 기본이지만 Scala Spark 라이브러리를 Python으로 Wrapping 해서 Python을 사용할 수 있도록 PySpark 도 제공한다.

Dockerized Spark

https://github.com/sangwonl/docker-spark

일단 여기에 Spark 실행 환경을 갖춘 Dockerfile이 있는데 line by line 으로 몇 가지만 살펴보겠다.

FROM sangwonl/hadoop-base:2.7.1

베이스 이미지는 지난 포스팅에서 사용한 sangwonl/docker-hadoop-cluster 의 base 이미지로 한다. hadoop 바이너리와 설정을 주입할 수 있는 환경이 갖춰져 있어서 좋다.

ENV SPARK_VERSION 2.1.0
ENV SPARK_URL http://d3kbcqa49mib13.cloudfront.net/spark-$SPARK_VERSION-bin-hadoop2.7.tgz
RUN set -x \
&& curl -fSL "$SPARK_URL" -o /tmp/spark.tgz \
&& tar -xvf /tmp/spark.tgz -C /opt/ \
&& rm /tmp/spark.tgz \
&& mv /opt/spark-$SPARK_VERSION-bin-hadoop2.7 /opt/spark
ENV SPARK_HOME=/opt/spark
RUN cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf

Spark 버전 2.1.0 바이너리를 받아서 /op/spark 에 풀어두고 이 Path 를 SPARK_HOME 환경변수에 설정해주자. 그리고 spark-defaults.conf 파일을 template 파일로부터 복사해서 생성해두자. 사실 이게 끝이다.

RUN curl -fSL http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar -o $SPARK_HOME/jars/hadoop-aws-2.7.3.jar \
&& curl -fSL http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o $SPARK_HOME/jars/aws-java-sdk-1.7.4.jar

이 부분은 Optional 이긴 한데 Spark 에서 데이터 소스를 S3(AWS)로 하려고 하면 AWS에서 개발하고 배포한 하둡 관련 jar 라이브러리가 필요하다고해서 hadoop-aws.jar 와 의존하고 있는 aws-java-sdk.jar 를 받아서 $SPARK_HOME/jars 에 설치해두었다.

PySpark 실행

Dockerized hadoop cluster 가 실행 중이라고 하고(`hadoop` 이라는 docker bridge network에 떠있다라고 가정) PySpark Shell 을 띄워보자.

$ docker run -it --network hadoop --env-file hadoop.env sangwonl/hadoop-spark:2.1.0 ./bin/pyspark --master yarn
Configuring core
- Setting fs.defaultFS=hdfs://namenode:8020
Configuring hdfs
Configuring yarn
- Setting yarn.resourcemanager.resource-tracker.address=resourcemanager:8031
- Setting yarn.resourcemanager.scheduler.address=resourcemanager:8030
- Setting yarn.resourcemanager.address=resourcemanager:8032
Configuring httpfs
Configuring kms
Configuring for multihomed network
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/04/30 18:30:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/04/30 18:30:58 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
17/04/30 18:32:25 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/04/30 18:32:25 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
17/04/30 18:32:28 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
>>>

Spark이 Hadoop을 이용하려면 하둡 설정을 참조해야하는데 기본적으로 HADOOP_CONF_DIR 환경변수를 찾는다. 우리는 베이스이미지에서 이미 이 설정을 해두었기 때문에 원하는 설정 값들을 docker 실행시에 환경변수 형태로 주입해주면 된다. hadoop.env 에 해당 내용이 있다.

그리고 pyspark 실행할때 master 를 yarn 으로 주어서 하둡 2.x의 YARN을 통해 Spark Job을 수행하도록 설정한다. 이렇게 실행하면 위처럼 Spark 로고와 함께 Shell 이 뜬다. 이때 Hadoop Resource Manager 콘솔에 들어가보면 PySparkShell 이 Running 중인걸 확인할 수 있다.

Spark Example

간단한 문장들을 배열로 정의하고 이를 RDD 형태로 parallelize 한 후에 word count 를 하는 예제다.

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
>>> lines = [
... "crazy crazy fox jumped",
... "crazy fox jumped",
... "fox is fast",
... "fox is smart",
... "dog is smart"
... ]
>>> lines_rdd = sc.parallelize(lines)
>>> lines_rdd
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475
>>> frequencies = lines_rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
>>> frequencies.collect()
[('crazy', 3), ('fast', 1), ('fox', 4), ('is', 3), ('smart', 2), ('jumped', 2), ('dog', 1)]
>>> frequencies.count()
7
>>>