Snowflake 와 Spark on EMR 연동하기

MJ Lee
Snowflake Korea
Published in
8 min readAug 10, 2022

Snowflake Connector for SparkJDBC drive 를 사용하면 Spark 작업을 Snowflake 환경으로 오프로드해서 실행할 수 있습니다.

이번 글에서는 Snowflake 와 EMR 을 연동한 후 Spark DataFrame 을 Snowflake 에 Write 하는 작업과 반대로 Snowflake 테이블을 Spark DataFrame 으로 Read 하는 작업 절차를 상세히 순서대로 설명하겠습니다. 이번 글에서 사용한 버전은 EMR 5.36.0 / Spark 2.4.8 / Zeppelin 0.10.0 / Snowflake connector for Spark 2.9.3 / Snowflake JDBC driver 2.13.14 입니다.

우선 가장 먼저 snowflake-jdbc-3.13.14.jarspark-snowflake_2.11–2.9.3-spark_2.4.jar 를 다운로드 합니다.

이제 AWS 웹 화면에서 EMR 서비스를 선택한 후 Create cluster 버튼을 클릭합니다. 그 다음 아래와 같이 Software configuration 에서 Spark 를 선택하고 Security and access 에서는 EC2 key pair 에서 미리 생성해둔 key pair 를 선택합니다. (key pair 생성 방법은 여기 참고) 노드 개수 등등 나머지 옵션은 원하는 대로 변경합니다. 이후 화면 하단 Create cluster 버튼을 클릭합니다.

[ EMR 클러스터 생성 화면 ]

Master 와 Core 가 Running 상태가 되면 Cluster 는 Waiting 상태가 됩니다. 그러면 Connect to the Master Node Using SSH 링크를 클릭합니다.

[ EMR 클러스터 생성 후 화면 ]

화면에 명시된 명령어를 통해 EMR Master 노드와 커넥션을 생성합니다. 이때 pem 키는 key pair 생성 시 만들었던 키 입니다.

[ Connect to the Master Node Using SSH 링크 클릭 후 화면 ]

다운로드 받은 spark 커넥터와 jdbc 드라이버를 Spark 마스터 노드에 업로드합니다.

scp -i ~/.aws/mjkey.pem snowflake-jdbc-3.13.14.jar spark-snowflake_2.11–2.9.3-spark_2.4.jar hadoop@ec2–xxx–xxx–xxx–xxx.ap-northeast-2.compute.amazonaws.com:./

그 다음으로 Zeppelin UI 에 접속합니다. EMR 설정 화면에서 Application user interface 탭을 클릭하면 Zeppelin URL 을 확인할 수 있습니다. 그런데 로컬 웹 화면에서 Zeppelin URL 에 접속하려면 Zeppelin 을 호스팅하는 Master 노드와 SSH 터널링 설정을 해야합니다. 클러스터 설정 화면 중간에 Application user interface 섹션 아래 Enable an SSH Connection 을 클릭하시면 자세한 설정 방법이 명시되어 있습니다.

[ Application user interface 섹션 ]

가이드에 따라 설정을 완료하셨으면 Zeppelin 웹 화면으로 접속합니다. Zeppelin 웹 화면 우측 상단 Anonymous -> Interpreter 를 클릭합니다. 스크롤를 가장 아래로 내리면 spark config 화면이 보입니다. edit 버튼을 클릭합니다.

[ Zeppelin spark config 설정 화면 ]

두 가지 properties 를 추가합니다. 먼저 spark.jars 에는 초기 단계에서 업로드한 커넥터와 드라이버의 경로를 명시하고, spark.jars.packages 에는 jar 파일의 패키지명을 groupId:artifactId:version 형식으로 아래와 같이 명시한 후 하단에 save 버튼을 클릭합니다.

[ Zeppelin spark config 설정 화면 중 jar property 설정 값 ]

Zeppelin 화면 상단 Notebook -> Create new note 를 클릭해서 새로운 노트를 시작합니다. 아래와 같이 Snowflake 접속 정보를 담은 객체를 생성합니다. URL 은 Snowflake 접속 URL 을 명시해주세요.

val SNOWFLAKE_SOURCE_NAME = “net.snowflake.spark.snowflake”
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.DataFrame
var sfOption = Map(
“sfURL” -> “xxxxx_yyyyy.snowflakecomputing.com”,
“sfUser” -> “admin”,
“sfPassword” -> “my_password”,
“sfDatabase” -> “sales_db”,
“sfWarehouse” -> “load_m_wh”
)

Spark DataFrame 으로 Snowflake 에 저장된 데이터를 가져옵니다.

val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOption)
.option(“query”, “SELECT COUNT(*) FROM orders GROUP BY O_ORDERSTATUS”)
.load()

아래와 같이 show() 로 결과를 확인할 수 있습니다.

[ Zeppelin 화면에서 DataFrame 을 로드한 결과 ]

Snowflake 에서 확인해보면 아래와 같이 쿼리가 실행된 것을 확인할 수 있습니다.

[ Snowflake 로 오프로드된 쿼리 ]

이제 Write 작업 차례입니다. 아래처럼 Spark DataFrame 데이터를 Snowflake 테이블에 Write 합니다.

df.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOption)
.option(“dbtable”, “orders_count”)
.save()

Spark connector 를 통해 세션이 만들어지면 Snowflake 에 임시로 internal stage 가 아래와 같이 생성되고 여기에 DataFrame 데이터가 쓰여집니다.

[ Snowflake 에 임시로 생성된 internal stage 에 업로드된 Spark DataFrame ]

그런 다음 타켓 테이블이 없으면 아래처럼 테이블이 생성되고

[ Snowflake 에 자동 생성된 타켓 테이블 ]

마지막으로 internal stage 데이터가 Snowflake 에 Write 됩니다.

[ Internal stage 에 업로드된 데이터를 Snowflake 테이블로 적재 ]

이번 글에서는 Snowflake connector for Spark 을 사용해서 Spark on EMR 작업을 Snowflake 환경으로 오프로드하여 실행하는 방법에 대해 알아봤습니다. 내부 테스트 결과 Spark 에서 사용한 리소스보다 적은 메모리를 사용해서 실행 시간이 약 40% 빨라진 결과를 확인할 수 있었습니다.

다만 DataFrame 사용 시 select 쿼리만 가능하다는 점, Spark UDF 는 Snowflake 로 오프로드가 되지 않는다는 점으로 인해 Spark 작업을 100% Snowflake 로 오프로드 하기는 어렵습니다. 따라서 여전히 EMR 클러스터 운영 및 비용에 대한 부담이 있을 수 밖에 없는 구조인데요. 이보다 더 나은 방안으로 다음 글에서는 Snowpark 를 사용해 Spark 워크로드를 Snowflake 로 100% 오프로드 함으로써 성능을 향상하고 비용을 절약할 수 있는 방법에 대해서 알아보겠습니다.

--

--