네이버 클라우드 플랫폼 서비스를 활용한 Spark ETL 및 데이터 시각화-1부

MyeongSu Kim
NAVER CLOUD PLATFORM
14 min readSep 1, 2020

이번 시간에는 네이버 클라우드 플랫폼의 상품들을 활용하여 데이터를 저장하고, 저장된 데이터를 오픈소스 Apache Spark를 활용하여 ETL 작업을 하고, 이를 분석 및 시각화를 해보도록 하겠습니다.

이번 컨텐츠는 총 2부로 구성되어 있습니다. 1부에서는 오픈 데이터를 활용하여 Object Storage 데이터를 추출하고 다차원 데이터 분석을 위한 스키마로 변환하여 저장하는 작업을 수행하고, 2부에서는 1부에서 저장한 다차원 데이터를 분석하고 시각화하는 부분으로 나뉩니다.

주요 상품 설명

▶Cloud Hadoop 상품은 Hadoop, HBase, Spark, Hive, Presto 등의 오픈 소스 프레임워크를 손쉽게 구축할 수 있습니다.

▶Object Storage 상품은 안전한 보관이 필요하거나, 대용량 데이터를 저장하는데 활용할 수 있으며, 제공되는 API를 사용하여 서버 데이터의 백업 및 복구 용도로도 활용할 수 있습니다.

※더욱 자세한 서비스 소개는 아래의 링크에서 확인 가능합니다!
Cloud Hadoop: https://www.ncloud.com/product/analytics/cloudHadoop
Object Storage: https://www.ncloud.com/product/storage/objectStorage

Cloud Hadoop 클러스터 생성

Console에서 Cloud Hadoop 상품에 접속하여 클러스터 생성을 클릭합니다.

클러스터 이름, 버전, Type 및 관리자 계정정보를 입력합니다.

클러스터 버전을 Cloud Hadoop 1.2를 선택하고, Type을 Spark로 선택합니다.

작업자 노드 개수를 3개로 늘립니다.

클러스터에 접속하기 위한 인증키를 선택합니다.

클러스터를 생성합니다.

Cloud Hadoop 클러스터가 운영중 상태가 될때까지 기다립니다.

Data Set준비

Data Set이 저장될 Object Storage 버킷 및 폴더를 생성합니다.

bucket_name/movielens/raw/

여기서는 학습용으로 많이 사용하는 MovieLens 영화 평가 오픈 데이터 세트를 활용합니다. csv 파일로 zip으로 압축하여 제공되며, 데이터 관련 자세한 정보는 아래 URL에서 확인 합니다.

URL : https://grouplens.org/datasets/movielens/latest/

MovieLens의 여러 정보들중 movies, tags, ratings 데이터를 활용합니다.

생성한 Cloud Hadoop 클러스터의 edge 노드에 접속하여 zip으로 압축된 데이터 셋을 다운로드 받습니다. (edge 노드 접속 방법은 Cloud Hadoop 가이드 문서 참고)

wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

다운로드 받은 zip 파일을 압축 해제합니다.

unzip ml-latest.zip

Object Storage에 데이터를 업로드하기 위해 aws cli를 설치합니다.

sudo yum install python3 -ysudo pip3 install awscli==1.15.85aws configure

aws configure 를 통해 access key, secret key를 입력하여 aws s3 API를 사용하여 네이버 클라우드 플랫폼의 Object Storage로 접근하기 위한 준비를 합니다.

자세한 방법은 Object Storage cli 가이드를 참고 합니다.

Object Storage cli 사용 가이드 : https://cli.ncloud-docs.com/docs/guide-objectstorage

Object Storage에 movies, ratings, tags csv 파일을 업로드 합니다.

aws --endpoint-url=https://kr.object.ncloudstorage.com s3 cp ./ml-latest/ratings.csv s3://ncp-spark/movielens/raw/ratings/ratings.csvaws --endpoint-url=https://kr.object.ncloudstorage.com s3 cp ./ml-latest/tags.csv s3://ncp-spark/movielens/raw/tags/tags.csvaws --endpoint-url=https://kr.object.ncloudstorage.com s3 cp ./ml-latest/movies.csv s3://ncp-spark/movielens/raw/movies/movies.csv

Object Storage에 데이터가 정상적으로 업로드 되었는지 확인합니다.

ETL 작업

Cloud Hadoop의 Spark를 활용해서 ETL 작업을 수행합니다.

이번 블로그에서는 zeppelin을 활용해서 spark client 모드로 작업을 수행합니다. (spark job 워크플로우를 오케스트레이션하는 부분은 다루지 않습니다.)

Zeppelin Spark Interpreter Properties

Spark Submit 참고 : https://spark.apache.org/docs/latest/submitting-applications.html

Cloud Hadoop 클러스터 Application중 9995포트로 Zeppelin에 접속합니다.

*접근을 위해서는 ACG 설정을 해야 하며 여기서는 해당 부분은 생략합니다.

Zeppelin에 로그인 합니다.

Create new note를 눌러 note를 하나 생성합니다.

Cloud Hadoop 1.2 버전에서 제공되는 Spark 2.x 에서는 csv 파일을 읽을 수 있도록 관련 패키지가 내장되어 있으나 Spark 1.6에서는 별도로 csv파일을 읽기 위한 패키지 설정이 필요합니다.

여기서는 Spark 1.6을 사용하고, Spark Submit시 패키지를 읽고, MySQL 접속을 위한 jar를 참고하도록 하겠습니다.

먼저 8080 포트로 Ambari에 접속하여 Zeppelin Notebook을 선택하고 Configs 탭을 눌러 Advanced zeppelin-env 에서 zeppelin_env_content에 아래 옵션을 추가 합니다.

export SPARK_SUBMIT_OPTIONS="--packages com.databricks:spark-csv_2.10:1.2.0  --jars /usr/share/java/mysql-connector-java.jar"

Save 하고 Restart 합니다.

다시 Zeppelin에 접속하여 위에서 생성한 note를 오픈하여 다차원 데이터 분석을 위한 스키마 변환을 합니다. (여기서 date, time 차원 데이터 생성은 생략하였습니다. 관련 데이터를 검색하거나 코드를 작성하여 데이터를 생성하시면 됩니다.)

%spark
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.functions._
val ratingsDf = sqlContext.read.format("com.databricks.spark.csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("s3a://ncp-spark/movielens/raw/ratings/ratings.csv")
val tagsDf = sqlContext.read.format("com.databricks.spark.csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("s3a://ncp-spark/movielens/raw/tags/tags.csv")
val moviesDf = sqlContext.read.format("com.databricks.spark.csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("s3a://ncp-spark/movielens/raw/movies/movies.csv")
val movieDf = moviesDf.select("movieid","title").distinct
movieDf.write.parquet("s3a://ncp-spark/movielens/parquet/dim_movie/")

val genretmpDf= moviesDf.select("genres").distinct
val genreDf = genretmpDf.withColumn("genreid",row_number().over(Window.orderBy("genres")))
genreDf.write.parquet("s3a://ncp-spark/movielens/parquet/dim_genre/")
val joinedDf = moviesDf.join(genreDf, Seq("genres"), "inner")
val assocMovieGenreDf = joinedDf.select("movieid", "genreid").distinct
assocMovieGenreDf.write.parquet("s3a://ncp-spark/movielens/parquet/assoc_moviegenre/")
val ratingtmpdf = ratingsDf.select("userid").distinct
val tagtmpdf =tagsDf.select("userid").distinct
val userDf = ratingtmpdf.join(tagtmpdf, Seq("userid"), "outer")
userDf.write.parquet("s3a://ncp-spark/movielens/parquet/dim_user/")
val dimDateDf = sqlContext.read.parquet("s3a://ncp-spark/movielens/parquet/dim_date/")
val dimTimeDf = sqlContext.read.parquet("s3a://ncp-spark/movielens/parquet/dim_time/")
val tagtmpdf = tagsDf.select($"userid",$"movieid",$"tag",$"timestamp",year(from_unixtime($"timestamp")).as("year"), month(from_unixtime($"timestamp")).as("month"), dayofmonth(from_unixtime($"timestamp")).as("day"),hour(from_unixtime($"timestamp")).as("hour"),minute(from_unixtime($"timestamp")).as("minute"),second(from_unixtime($"timestamp")).as("second"))
tagtmpdf.join(dimDateDf, Seq("year", "month", "day"), "inner")
.join(dimTimeDf, Seq("hour", "minute", "second"), "inner")
.select($"userid",$"movieid",$"dateid",$"timeid",$"tag",$"timestamp")
.write.parquet("s3a://ncp-spark/movielens/parquet/fact_tags/")
val ratingtmpdf = ratingsDf.select($"userid",$"movieid",$"rating",$"timestamp",year(from_unixtime($"timestamp")).as("year"), month(from_unixtime($"timestamp")).as("month"), dayofmonth(from_unixtime($"timestamp")).as("day"),hour(from_unixtime($"timestamp")).as("hour"),minute(from_unixtime($"timestamp")).as("minute"),second(from_unixtime($"timestamp")).as("second"))
ratingtmpdf.join(dimDateDf, Seq("year", "month", "day"), "inner")
.join(dimTimeDf, Seq("hour", "minute", "second"), "inner")
.select($"userid",$"movieid",$"dateid",$"timeid",$"rating",$"timestamp")
.write.parquet("s3a://ncp-spark/movielens/parquet/fact_ratings/")

Object Storage에 데이터가 정상적으로 저장이 되었는지 확인합니다.

이제 movie, tags, ratings csv 파일을 아래와 같은 다차원 분석을 위한 스키마형태로 변환하였습니다. (다차원 데이터 분석을 위한 스키마 관련하여 궁금하신 부분은 Data Warehouse schema를 검색하여 확인해보시면 도움이 됩니다.)

Before
After

마무리하며…

Cloud Hadoop의 오픈소스 Spark를 활용하여 ETL 작업을 수행하여 다차원 데이터 분석을 위한 스키마 변환을 하였습니다.

2부에서는 변환된 데이터를 분석하고 시각화를 하겠습니다.

끝까지 읽어 주셔서 감사합니다.

--

--