Spark Part III — Performance Tricks & Config

Melissa Kuo
18 min readOct 1, 2021

--

Spark series
總共有三個Part,Part III是一些Performance Tricks & Config Setting~

Table of Contents

Performance Tricks
Cache & Persist
Broadcast Join
Key Salting
Spark Config

Performance Tricks

Cache & Persist

Spark提供了cache/persist的功能,可以保存intermediate data,適當的cache可以讓application的運算效率獲得大幅的提升。

舉個簡單的例子,假設今天我想要對信用卡交易資料分別對消費類別consumption_category、商店類別merchant_category這兩個欄位做group by sum:

groupBy消費類別:

groupby_consump = spak.sql("SELECT * FROM cc_txn") \          
.where(F.col('date').between('2021-09-01', '2021-09-15') \
.groupBy(F.col('consumption_category')) \
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_consump.show()

groupBy商店類別:

groupby_merchant = spak.sql("SELECT * FROM cc_txn") \          
.where(F.col('date').between('2021-09-01', '2021-09-15') \
.groupBy(F.col('merchant_category')) \
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_merchant.show()

會發現這兩段程式前面兩行的操作都是一樣的,先從hive select data、filter,所以spark在執行groupby_consump.show()groupby_merchant.show()這兩個action時會重複運算兩次scan data、filter,這時候我們可以先把這個itermediate data cache 起來:

df= spak.sql("SELECT * FROM cc_txn") \          
.where(F.col('date').between('2021-09-01', '2021-09-15') \
.cache()

再分別去groupBy consumption_categorymerchant_category:

groupby_consump = df.groupBy(F.col('consumption_category')) \           
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_consump.show()
groupby_merchant = df.groupBy(F.col('merchant_category')) \
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_merchant.show()

當我們多加了cache()這個動作,可以看到在Storage Tab中多了一個我們cache住的RDD:

Storage Tab

Storage Tab會呈現所有cached RDD跟他的詳細資訊:

  • Storage Level: Memory or Disk Deserialized, replication數
  • Cached Partitions: RDD的partition數
  • Fraction Cached: Cache百分比 (即使不是100%,也可以提升job效率,因為spark只需要process剩下的partitions)
  • Size in Memory: Cache在memory的大小
  • Size in Disk: Cache在memory的大小

RDD Name的超連結點進去一樣是進到Cached RDD的Details Page,主要就是寫這個Cached RDD存在那些executor、每個partitions的資訊:

Details for RDD

來看看兩段有cache & 沒有cache的差異,下圖是剛剛4個Job (action show())的DAG:

可以看到有cache的DAG少了Scan hive table、Filter的運算,直接從InMemoryTableScan處理已經cache住的RDD,也可以看到cache前filter後的RDD row數跟cache後的InMemoryTableScan row數是相同的。

再來看看cache前後的運算時間:

Job 0~3分別代表groupby_consump.show() w/o cache, groupby_merchant.show() w/o cache, groupby_consump.show() w/ cache, groupby_merchant.show() w/ cache,可以看到有cache之後,groupby_consump.show()的運算時間差異不大,但groupby_merchant.show()卻從5s大幅減少到0.4s,差了12.5倍,這是因為cache()也同樣有lazy evaluation的特性,所以會等到第一個action (groupby_consump.show() w/ cache)被執行後,才會運算RDD並cache住。

cache()persist()的差別:

  • cache只會將data存在memory
  • persist會存在memory或disk,可以透過參數設定Storage Level
  • cache跟persist基本上是一樣的,cache只是提供了persist的一個shortcut

雖然cache可以提升運算效率,但也因為cache將data存在memory,這也代表了拿去caching的這些memory就沒辦法做運算了,所以過多或不必要的cache也會導致運算速度變差或executor OOM。如果cache RDD占了太多memory,可以考慮改用persist(),確定後續計算不會再使用的這個cached RDD也可以用unpersist()釋放資源。

Performance for persist (Source)

左圖可以看到No cache, MEMORY_ONLY (cache), DISK_ONLY (persist), MEMORY_AND_DISK (persist)效率的差異,可以看到Iteration (reuse次數) 超過2次以上,無論是使用cache或persist都比No cache來的快,雖然DISK_ONLY效率較MEMORY_ONLY跟MEMORY_AND_DISK差一些,但畢竟memory是很珍貴的,可以視情況trade-off要使用哪一種Storage Level。

Broadcast Join

在Part II中介紹Job、Stage、Task時有提到有許多transformation需要shuffle,而join就是其中一個。如下圖,在join時,我們將兩個dataframe相同key值的data join起來,而在spark中parallelism的機制中,我們要確保兩個dataframe有相同key值的資料被分在相同的partition,才能join出正確的結果,因此join也是一個需要shuffle的transformation。

sort merge join

但shuffle其實是一個成本非常高的動作,他牽涉到資料寫入硬碟、在network之間交換資料,因此如果能盡量避免shuffle,就能提升運算效率。而Broadcast Join其實就是一個避免shuffle的作法,如下圖,Broadcast Join不將dataframe做shuffle,而是將其中一個dataframe broadcast到每個executors上,讓每個executor都能看到這個dataframe全部的key,來達到避免shuffle。

然而braodcast join算是一個利用空間換取時間的方法,broadcast dataframe也意味著executors需要花較多的memory存放這個dataframe,因此broadcast在join一大一小的dataframe時就是一個很好的使用情境,例如有一個很巨大的log資料去join一個較小的key-value mapping table,這時候就可以broadccast那個較小的dataframe,避免shuffle。

Spark其實有自動broadcast較小dataframe的機制,如果dataframe的大小小於10MB,Spark就會自動broadcast這個較小的dataframe,如果想braodcast大於10MB的data,可以透過設置spark.sql.autoBroadcastJoinThreshold這個參數,也可以自己手動broadcast: df1.join(broadcast(df2))

假設今天我要將信用卡交易資料left join卡種資料,來知道每筆刷卡交易使用哪種信用卡:

cc_txn = spark.sql("SELECT id, card_type, txn_amt FROM cc_txn")
card_type = spark.sql("SELECT card_type, card_name FROM card_type") cc_txn.count() #Output: 19109633
card_type.count() #Output: 741
cc_txn.join(card_type, on="card_type", how="left").show()

下圖左邊是沒有broadcast的DAG,可以看到他分別要對兩張表shuffle (Exchange)再進行join,如果我們今天先對比較小的card_type broadcast:

cc_txn.join(F.broadcast(card_type), on="card_type_code", how="left").show()

下圖右邊是有broadcast的DAG,他並沒有對兩張表作shuffle,只有對card_type做BroadcastExchange

下圖可以再看到sort merge join有3個stage,而broadcast join只有一個stage,越少的stage也代表者shuffle的次數越少,執行時間也越短,改為broadcast join後,原本的執行時間從9s減少到0.1s,加快了90倍!

Key Salting

在Part II中我們介紹Stages Tab裡時,有提到可以透過Summary Metrics for Tasks這個Table來觀察是不是有Data Skew的發生,假設我們今天遇到了Data Skew,又應該如何解決呢?

造成Data Skew的主要的原因其實是每個key的資料量有非常大的差異,而這在真實世界其實也是非常普遍的現象,例如商店的銷售紀錄總是會集中在其中幾項非常熱銷的商品,而大多數的商品被購買次數則很少。

為了展示Data Skew,我製造了一個極端的假資料:

#Output:
+-----+--------+
| key| count|
+-----+--------+
|key-0| 1|
|key-1| 1|
|key-2|99999998|
+-----+--------+

還有另一個比較小一點的假資料:

#Output:
+-----+-----+
| key|count|
+-----+-----+
|key-0| 1|
|key-1| 1|
|key-2| 18|
+-----+-----+

這個假資料有99.99%以上的key都集中在key-2這個key,如果我們join這兩個dataframe:

df1.join(df2, on="key", how="left").count()

透過上圖的Summary Metrics for Tasks可以發現,每個Tasks的loading差非常多,Data Skew的問題非常嚴重,有75%的Task的Shuffle Read Size都是0,而loading最大的Task則是處理了100000018個Records,上方的Event Time也可以看到所有的Task都很快就執行完了,只有其中一個Task執行了非常久,這是因為df1中我們有99.99%的key都是相同的,

在前面broadcast時有提到,在join時相同key值的資料會被分在相同的partition,如上圖所示,因為key-2的資料非常多,所以partition 3需要處理的資料量會遠遠多於partition 1 & 2,造成Task間非常不平衡的狀況。

這時候我們就可以考慮用key salting這個小技巧,來解決Data Skew的問題!

key salting的想法其實就是把原本的key隨機加上0~n的random value,把原本的key的分布變得比較uniform,如下圖的步驟(1)。在原本的key隨機加上0~n的random value雖然可以將原本集中在key-2的資料分散成n個key,不過要join df2時就沒辦法對應到要join的key了,所以df2也是必須要對原本的key作手腳的,不過df2的key就不能像df1一樣隨機加上0~n就解決了,假設在df1key-0加上一個隨機數字後變成key-0–0,而在df2這邊加上一個隨機數字後變成key-0–2,那原本這兩個key應該要join的就沒辦法join到了,因此,為了確保在df1中被隨機更改的key在df2都能夠對應到,我們要對df2的key做explode,也就是說,把原本的key-0展開成key-0–0, key-0–1, …, key-0-n ,如下圖的step(2),因為不知道在df1key-0會隨機被加上0~n的哪個數字,所以就df2就把所有可能都列舉出來,來保證join的正確性~

key salting在pyspark的實現如下:

可以看到Event Timeline中每個Task的執行時間比原本平均很多了,不會只集中在其中一個Task,Summary Metrics for Tasks中也變得比較平均,再來看看執行時間,Job 27是一般的join,Job 28是key salting join,執行時間分別是5.1 mins跟3.8 mins,差了1.34倍!

我們剛剛舉的例子中對df2的key做了展開的動作,而這個展開的動作會增加原本的資料量,在我們的例子中設定n=50 (參數coarseness),df2就膨脹了50倍,n設定的越大,key在df1的分布就越uniform,data skew的狀況也越會越緩和,但相對的,df2的資料量也會膨脹越大,因此這個n的設定也非常重要,n過小沒辦法把將分布變得更uniform,n過大則會造成資料過度膨脹。

我們通常會選擇比較小的dataframe來做explode,讓資料膨脹的情況沒有那麼嚴重,不過如果其中一個dataframe足夠小到能夠broadcast,直接broadcast應該會是最快的作法,直接省去了shuffle的操作。

Spark Config

Spark config (Spark properties)控制了spark application的設定,而且每個application的config都是獨立且分開被設置的。

Spark config有非常多的參數可以設定,詳細可以參考官方的docs,這裡介紹一些我有用過&常見的config:

Core settings

  • spark.app.name: application的名稱,會顯示在Spark UI上
  • spark.master: Cluster Manager的設定,可以是"local" : run on local,或是"yarn" , "mesos" 等Cluster Manager
  • spark.submit.deployMode: driver program的deploy方式,可以是"client" or "cluster"

Driver

  • spark.driver.cores: driver的core數,default=1
  • spark.driver.maxResultSize: 傳回driver (collect()等action)的資料上限大小,default=1g
  • spark.driver.memory: drive的memory大小,default=1g

Executor

  • spark.executor.cores: executors的core數,default=1
  • spark.executor.memory: executor的memory大小,default=1g

Netwwoking

  • spark.rpc.message.maxSize: executors間傳遞資料大小的最大上限,default=128m,有時Spark會跳出超過這個max size,再調大即可(上限2047m)

Dynamic Allocation

Dynamic Allocation在多人使用的Cluster上還蠻重要的,有時候我們可能會先用Spark處理資料,處理完後可能就使用tensorflow或xgboost來訓練模型,這時候就用不到Cluster上的資源,而設置dynamic allocation能夠自動把資源釋放出來給需要的人使用。

  • spark.dynamicAllocation.enabled: 是否啟用dynamic allocation,如果啟用,Spark會依照目前application的workload動態配置executors的數量,default=false
  • spark.dynamicAllocation.maxExecutors: dynamic allocation下最多executor數的上限,default=infinity,所以有設置dynamic allocation一定要設置maxExecutors這個參數呀! 如果是在一個多人共用的cluster上,沒有把這個參數設好,一下子就把整個叢集的資源吃光光,影響到其他同事的工作就不好了~

SQL

  • spark.sql.shuffle.partitions: shuffle完後的partition數,default=200
  • spark.sql.adaptive.enabled: 是否開啟adaptive query,Spark會自動優化query plan

還有一些adaptive query的參數像是spark.sql.adaptive.skewJoin.enabled, spark.sql.adaptive.skewJoin.skewedPartitionFactor可以adaptive控制data skew的狀況,不過這兩個參數都是再Spark 3.0+才有的,我也沒有實際使用過,如果在使用的本版本是Spark 3以上,又有遇到Data Skew的問題,或許可以試看看調整這兩個參數~

以上是我一些工作上使用Spark的心得還有一些網路課程跟網絡資料的整理,如果有任何寫的不正確的地方,還請多多指正 :)

References

--

--

Melissa Kuo

老是念了什麼忘了什麼,整理一下腦袋的東東。👇𝑪𝒐𝒏𝒕𝒂𝒄𝒕 𝒎𝒆 𝒐𝒏 𝑳𝒊𝒏𝒌𝒆𝒅𝑰𝒏👇 https://www.linkedin.com/in/melissakou