Data Forest에서 Spark, Hive로 데이터 처리하기 Part.2

Suewoon Ryu
NAVER CLOUD PLATFORM
17 min readJul 20, 2021

--

지난 글(Part.1)에서는 네이버 클라우드 플랫폼 Data Forest에서 Zeppelin 앱을 생성하여 Spark Job과 Hive 쿼리를 실행해 봤습니다.

이번에는 Zeppelin 노트북이 아니라 DEV앱을 생성하여 spark-submit beeline 과 같은 클라이언트를 사용해서 같은 작업을 진행해 보겠습니다.

DEV 앱에는 Data Forest에서 제공하는 모든 서비스에 대한 개발 환경이 구성되어 있습니다. 따라서 스크립트를 실행하기 위한 사전 준비가 따로 필요하지 않습니다. Data Forest 앱이 아닌 VPC VM에서도 개발 환경 구성을 하면 아래 과정을 동일하게 진행할 수 있습니다.

Data Forest App 생성하기

DEV앱은 개발 환경이 설정되어 있는 서버를 생성합니다. 네이버 클라우드 플랫폼(ncloud.com) 콘솔 [Data Forest > App] 에서 + App 생성을 클릭하여 앱을 생성할 수 있습니다.

앱이 STABLE 상태가 되면 생성이 완료된 것입니다.

데이터 처리하기

데이터 셋은 동일하며, 데이터 셋 업로드는 Part.1을 참고 부탁드립니다.

1. spark 클라이언트 실행하기

DEV앱에서 Spark 스크립트를 정상적으로 수행하려면 먼저 Kerberos 인증이 필요합니다. Keytab 사용과 인증은 가이드 참고 부탁드립니다. 인증이 정상적으로 되었다면 아래와 같이 확인할 수 있습니다.

⁎인증을 하지 않으면 org.apache.hadoop.security.AccessControlException 이 발생하므로 주의합니다.

$ klist 
Ticket cache: FILE:/tmp/krb5cc_p26244
Default principal: example@KR.DF.NAVERNCP.COM
Valid starting Expires Service principal
07/14/2021 17:38:02 07/15/2021 17:38:02 krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
renew until 07/21/2021 17:38:02

hdfs 클라이언트 명령어로 데이터 셋도 확인할 수 있습니다.

$ hadoop fs -ls | grep .csv
-rw------- 3 example services 1493648 2021-07-12 15:25 movie.csv
-rw-r--r-- 3 example services 690353377 2021-07-09 18:11 rating.csv

인증 이외에 별도 설정은 필요하지 않습니다. 먼저 spark-shell 을 실행해 보겠습니다. Zeppelin 노트북과 마찬가지로 spark-shell 에서는 SparkContext , SparkSession 이미 생성되어 있는 것에 주의합니다.

Zeppelin 노트북에서 썼던 코드를 그대로 실행할 수 있습니다.

$ spark-shell

아래 코드는 Zeppelin 노트북에서 실행한 것과 동일합니다. :paste 로 paste 모드에 진입하여 코드를 쉽게 붙여넣기 할 수 있습니다. ctrl+D로 빠져나오면 됩니다.

import org.apache.spark.sql.types.{StructType,IntegerType}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import java.sql.Timestampcase class Movie(movieId: Int,title: String, genres: String)
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
import spark.implicits._val movie_df_temp = spark.read
.option("header", "true")
.schema(movie_schema)
.csv("hdfs://koya/user/example/movie.csv")
val movie_df = movie_df_temp
.select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre"))
.withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1))
.withColumn("releasedYear", $"releasedYear".cast(IntegerType))

val rating_df_temp = spark.read
.option("header", "true")
.schema(rating_schema)
.csv("hdfs://koya/user/example/rating.csv")
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
.agg(count("*").as("count")).filter($"count" > 10)
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
.select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")import org.apache.spark.sql.SaveMode
movie_rating_df.repartition($"year", $"month")
.write.partitionBy("year", "month")
.mode(SaveMode.Append)
.parquet("/user/example/movie_rating")

REPL 형태가 아니라, 소스코드를 jar로 빌드 하여 제출( spark-submit)할 수도 있습니다. 해당 작업은 1개의 argument만 필요로 합니다. 데이터 셋이 위치한 HDFS 경로를 지정하면 됩니다. 이후에 결과 파일도 해당 경로 아래 생성됩니다.

소스코드는 Github에서 확인할 수 있습니다.

$ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar 
$ spark-submit --class com.dataforest.example.MovieRatingWriter \
--master yarn --deploy-mode cluster --queue longlived \
--num-executors 20 ./movie-rating-assembly-0.1.jar \
'hdfs://koya/user/example'

작업을 제출하고 아래처럼 진행 상황이 콘솔에 출력됩니다.

Spark Job을 제출하고 나면 Resource Manager UI에서 작업의 상태와 진행 로그를 확인 할 수 있습니다.

Spark History Server UI에서 작업이 어떻게 실행되고 있는지 더 자세하게 알 수 있습니다.

작업이 완료되면 아래처럼 지정한 HDFS 경로 아래 movie_rating 디렉토리가 생기고, 그 아래로 결과 Parquet 파일이 생성되었음을 확인할 수 있습니다.

2. Hive 클라이언트 실행하기

Data Forest에서 제공하는 HiveServer2는 계정 및 패스워드 인증을 사용합니다. 따라서 위에서 진행한 Kerberos 인증과 별개로 인증이 필요합니다. 여기서 패스워드는 Data Forest Account를 생성할 때 지정한 패스워드입니다.

아래처럼 beeline 클라이언트를 실행합니다. PASSWORD 에 특수문자가 있을 경우 single quote로 감싸줘야 합니다.

$ beeline -u "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}

다음으로 External 데이터베이스를 생성해 보겠습니다. 보기 쉽게 output형식을 !set outputformat vertical 로 변경해 주었습니다. LOCATION 키워드로 경로 설정해 주셔야 하는 점을 유념해 주세요.

CREATE DATABASE example__db_movie_rating
COMMENT 'MovieLens 20M Dataset'
LOCATION '/user/example/warehouse/example__db_movie_rating'
WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01');
DESCRIBE DATABASE example__db_movie_rating;db_name example__db_movie_rating
comment MovieLens 20M Dataset
location hdfs://koya/user/example/warehouse/example__db_movie_rating
owner_name example
owner_type USER
parameters

Spark Job에서 생성한 Parquet 파일로부터 테이블을 생성합니다.

USE example__db_movie_rating;DROP TABLE IF EXISTS movie_rating;CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double)
partitioned by (year int, month int) stored as parquet
location '/user/example/movie_rating';
MSCK REPAIR TABLE movie_rating;

테이블 내용을 확인해봅니다.

SHOW TABLES;tab_name  movie_ratingSHOW CREATE TABLE movie_rating;createtab_stmt  CREATE EXTERNAL TABLE `movie_rating`(
createtab_stmt `movieid` int,
createtab_stmt `title` string,
createtab_stmt `genre` string,
createtab_stmt `releasedyear` int,
createtab_stmt `userid` int,
createtab_stmt `rating` double)
createtab_stmt PARTITIONED BY (
createtab_stmt `year` int,
createtab_stmt `month` int)
createtab_stmt ROW FORMAT SERDE
createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
createtab_stmt STORED AS INPUTFORMAT
createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
createtab_stmt OUTPUTFORMAT
createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
createtab_stmt LOCATION
createtab_stmt 'hdfs://koya/user/example/movie_rating'
createtab_stmt TBLPROPERTIES (
createtab_stmt 'bucketing_version'='2',
createtab_stmt 'transient_lastDdlTime'='1626253592')
SELECT * FROM movie_rating limit 1;movie_rating.movieid 1176
movie_rating.title Double Life of Veronique, The (Double Vie de V�ronique, La) (1991)
movie_rating.genre Romance
movie_rating.releasedyear 1991
movie_rating.userid 28507
movie_rating.rating 4.0
movie_rating.year 1995
movie_rating.month 1

생성한 테이블을 대상으로 쿼리를 수행할 수 있습니다. 각 연도에 개봉한 영화 중에서 평점 Top3 영화를 조회하는 쿼리입니다.

SELECT releasedYear, title, avgRating, row_num 
FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num
FROM movie_rating
GROUP BY releasedYear, title
) T
WHERE row_num <= 3
ORDER BY releasedYear DESC, row_num LIMIT 6;
releasedyear 2015
title Louis C.K.: Live at The Comedy Store (2015)
avgrating 3.8
row_num 1
releasedyear 2015
title Kingsman: The Secret Service (2015)
avgrating 3.6320754716981134
row_num 2
releasedyear 2015
title The Second Best Exotic Marigold Hotel (2015)
avgrating 3.5714285714285716
row_num 3
releasedyear 2014
title Zero Motivation (Efes beyahasei enosh) (2014)
avgrating 4.5
row_num 1
releasedyear 2014
title Whiplash (2014)
avgrating 4.074750830564784
row_num 2
releasedyear 2014
title Interstellar (2014)
avgrating 4.023864289821737
row_num 3

다음은 연도별 사용자에게 가장 인기 많았던 영화 (평점 수가 많았던 영화)를 찾아보는 쿼리입니다.

SELECT year, title, popularity, row_num FROM
(SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num
FROM movie_rating GROUP BY year, title) T
WHERE row_num = 1
ORDER BY year desc, popularity desc LIMIT 5;
year 2015
title Matrix, The (1999)
popularity 1207
row_num 1
year 2014
title Shawshank Redemption, The (1994)
popularity 2405
row_num 1
year 2013
title Shawshank Redemption, The (1994)
popularity 2549
row_num 1
year 2012
title Inception (2010)
popularity 2411
row_num 1
year 2011
title Inception (2010)
popularity 3235
row_num 1

마치며

지금까지 Data Forest에서 Spark, Hive 작업을 클라이언트를 통해 제출하는 방법을 알아봤습니다. 사용자 필요에 실행 방법을 다르게 해서 사용할 수 있습니다.

데이터를 가지고 여러 테스트를 해볼 목적이라면 Zeppelin을, 운영환경에 주기적으로 실행할 작업이라면 실행 작업을 스크립트로 만드는 것이 더 적합합니다. 예를 들어, Oozie 스케줄러에 jar 파일을 제출하고, .sql을 실행하는 작업을 등록해 둘 수 있습니다.

계속해서 더욱 유익한 글로 찾아뵙겠습니다. 감사합니다.

누구나 쉽게 시작하는 클라우드 ncloud.com
NAVER Cloud Data Platform 류수운입니다.

--

--