Spark Part III — Performance Tricks & Config
Spark series
總共有三個Part,Part III是一些Performance Tricks & Config Setting~
Table of Contents
Performance Tricks
╴Cache & Persist
╴Broadcast Join
╴Key Salting
Spark Config
Performance Tricks
Cache & Persist
Spark提供了cache/persist的功能,可以保存intermediate data,適當的cache可以讓application的運算效率獲得大幅的提升。
舉個簡單的例子,假設今天我想要對信用卡交易資料分別對消費類別consumption_category、商店類別merchant_category這兩個欄位做group by sum:
groupBy消費類別:
groupby_consump = spak.sql("SELECT * FROM cc_txn") \
.where(F.col('date').between('2021-09-01', '2021-09-15') \
.groupBy(F.col('consumption_category')) \
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_consump.show()
groupBy商店類別:
groupby_merchant = spak.sql("SELECT * FROM cc_txn") \
.where(F.col('date').between('2021-09-01', '2021-09-15') \
.groupBy(F.col('merchant_category')) \
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_merchant.show()
會發現這兩段程式前面兩行的操作都是一樣的,先從hive select data、filter,所以spark在執行groupby_consump.show()
跟groupby_merchant.show()
這兩個action時會重複運算兩次scan data、filter,這時候我們可以先把這個itermediate data cache 起來:
df= spak.sql("SELECT * FROM cc_txn") \
.where(F.col('date').between('2021-09-01', '2021-09-15') \
.cache()
再分別去groupBy
consumption_category、merchant_category:
groupby_consump = df.groupBy(F.col('consumption_category')) \
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_consump.show()groupby_merchant = df.groupBy(F.col('merchant_category')) \
.agg(F.sum('txn_amt').alias('sum_txn_nmt')) \
.orderBy(F.col('sum_txn_nmt').desc())
groupby_merchant.show()
當我們多加了cache()
這個動作,可以看到在Storage Tab中多了一個我們cache住的RDD:
Storage Tab會呈現所有cached RDD跟他的詳細資訊:
- Storage Level: Memory or Disk Deserialized, replication數
- Cached Partitions: RDD的partition數
- Fraction Cached: Cache百分比 (即使不是100%,也可以提升job效率,因為spark只需要process剩下的partitions)
- Size in Memory: Cache在memory的大小
- Size in Disk: Cache在memory的大小
RDD Name的超連結點進去一樣是進到Cached RDD的Details Page,主要就是寫這個Cached RDD存在那些executor、每個partitions的資訊:
來看看兩段有cache & 沒有cache的差異,下圖是剛剛4個Job (action show()
)的DAG:
可以看到有cache的DAG少了Scan hive table、Filter的運算,直接從InMemoryTableScan處理已經cache住的RDD,也可以看到cache前filter後的RDD row數跟cache後的InMemoryTableScan row數是相同的。
再來看看cache前後的運算時間:
Job 0~3分別代表groupby_consump.show()
w/o cache, groupby_merchant.show()
w/o cache, groupby_consump.show()
w/ cache, groupby_merchant.show()
w/ cache,可以看到有cache之後,groupby_consump.show()
的運算時間差異不大,但groupby_merchant.show()
卻從5s大幅減少到0.4s,差了12.5倍,這是因為cache()
也同樣有lazy evaluation的特性,所以會等到第一個action (groupby_consump.show()
w/ cache)被執行後,才會運算RDD並cache住。
cache()
跟persist()
的差別:
- cache只會將data存在memory
- persist會存在memory或disk,可以透過參數設定Storage Level
- cache跟persist基本上是一樣的,cache只是提供了persist的一個shortcut
雖然cache可以提升運算效率,但也因為cache將data存在memory,這也代表了拿去caching的這些memory就沒辦法做運算了,所以過多或不必要的cache也會導致運算速度變差或executor OOM。如果cache RDD占了太多memory,可以考慮改用persist()
,確定後續計算不會再使用的這個cached RDD也可以用unpersist()
釋放資源。
左圖可以看到No cache, MEMORY_ONLY (cache), DISK_ONLY (persist), MEMORY_AND_DISK (persist)效率的差異,可以看到Iteration (reuse次數) 超過2次以上,無論是使用cache或persist都比No cache來的快,雖然DISK_ONLY效率較MEMORY_ONLY跟MEMORY_AND_DISK差一些,但畢竟memory是很珍貴的,可以視情況trade-off要使用哪一種Storage Level。
Broadcast Join
在Part II中介紹Job、Stage、Task時有提到有許多transformation需要shuffle,而join
就是其中一個。如下圖,在join時,我們將兩個dataframe相同key值的data join起來,而在spark中parallelism的機制中,我們要確保兩個dataframe有相同key值的資料被分在相同的partition,才能join出正確的結果,因此join也是一個需要shuffle的transformation。
但shuffle其實是一個成本非常高的動作,他牽涉到資料寫入硬碟、在network之間交換資料,因此如果能盡量避免shuffle,就能提升運算效率。而Broadcast Join其實就是一個避免shuffle的作法,如下圖,Broadcast Join不將dataframe做shuffle,而是將其中一個dataframe broadcast到每個executors上,讓每個executor都能看到這個dataframe全部的key,來達到避免shuffle。
然而braodcast join算是一個利用空間換取時間的方法,broadcast dataframe也意味著executors需要花較多的memory存放這個dataframe,因此broadcast在join一大一小的dataframe時就是一個很好的使用情境,例如有一個很巨大的log資料去join一個較小的key-value mapping table,這時候就可以broadccast那個較小的dataframe,避免shuffle。
Spark其實有自動broadcast較小dataframe的機制,如果dataframe的大小小於10MB,Spark就會自動broadcast這個較小的dataframe,如果想braodcast大於10MB的data,可以透過設置spark.sql.autoBroadcastJoinThreshold
這個參數,也可以自己手動broadcast: df1.join(broadcast(df2))
假設今天我要將信用卡交易資料left join卡種資料,來知道每筆刷卡交易使用哪種信用卡:
cc_txn = spark.sql("SELECT id, card_type, txn_amt FROM cc_txn")
card_type = spark.sql("SELECT card_type, card_name FROM card_type") cc_txn.count() #Output: 19109633
card_type.count() #Output: 741
cc_txn.join(card_type, on="card_type", how="left").show()
下圖左邊是沒有broadcast的DAG,可以看到他分別要對兩張表shuffle (Exchange)再進行join,如果我們今天先對比較小的card_type
broadcast:
cc_txn.join(F.broadcast(card_type), on="card_type_code", how="left").show()
下圖右邊是有broadcast的DAG,他並沒有對兩張表作shuffle,只有對card_type
做BroadcastExchange
下圖可以再看到sort merge join有3個stage,而broadcast join只有一個stage,越少的stage也代表者shuffle的次數越少,執行時間也越短,改為broadcast join後,原本的執行時間從9s減少到0.1s,加快了90倍!
Key Salting
在Part II中我們介紹Stages Tab裡時,有提到可以透過Summary Metrics for Tasks這個Table來觀察是不是有Data Skew的發生,假設我們今天遇到了Data Skew,又應該如何解決呢?
造成Data Skew的主要的原因其實是每個key的資料量有非常大的差異,而這在真實世界其實也是非常普遍的現象,例如商店的銷售紀錄總是會集中在其中幾項非常熱銷的商品,而大多數的商品被購買次數則很少。
為了展示Data Skew,我製造了一個極端的假資料:
#Output:
+-----+--------+
| key| count|
+-----+--------+
|key-0| 1|
|key-1| 1|
|key-2|99999998|
+-----+--------+
還有另一個比較小一點的假資料:
#Output:
+-----+-----+
| key|count|
+-----+-----+
|key-0| 1|
|key-1| 1|
|key-2| 18|
+-----+-----+
這個假資料有99.99%以上的key都集中在key-2這個key,如果我們join這兩個dataframe:
df1.join(df2, on="key", how="left").count()
透過上圖的Summary Metrics for Tasks可以發現,每個Tasks的loading差非常多,Data Skew的問題非常嚴重,有75%的Task的Shuffle Read Size都是0,而loading最大的Task則是處理了100000018個Records,上方的Event Time也可以看到所有的Task都很快就執行完了,只有其中一個Task執行了非常久,這是因為df1
中我們有99.99%的key都是相同的,
在前面broadcast時有提到,在join時相同key值的資料會被分在相同的partition,如上圖所示,因為key-2的資料非常多,所以partition 3需要處理的資料量會遠遠多於partition 1 & 2,造成Task間非常不平衡的狀況。
這時候我們就可以考慮用key salting這個小技巧,來解決Data Skew的問題!
key salting的想法其實就是把原本的key隨機加上0~n的random value,把原本的key的分布變得比較uniform,如下圖的步驟(1)。在原本的key隨機加上0~n的random value雖然可以將原本集中在key-2的資料分散成n個key,不過要join df2
時就沒辦法對應到要join的key了,所以df2
也是必須要對原本的key作手腳的,不過df2
的key就不能像df1
一樣隨機加上0~n就解決了,假設在df1
的key-0
加上一個隨機數字後變成key-0–0
,而在df2
這邊加上一個隨機數字後變成key-0–2
,那原本這兩個key應該要join的就沒辦法join到了,因此,為了確保在df1
中被隨機更改的key在df2
都能夠對應到,我們要對df2
的key做explode,也就是說,把原本的key-0展開成key-0–0, key-0–1, …, key-0-n
,如下圖的step(2),因為不知道在df1
中key-0
會隨機被加上0~n的哪個數字,所以就df2
就把所有可能都列舉出來,來保證join的正確性~
key salting在pyspark的實現如下:
可以看到Event Timeline中每個Task的執行時間比原本平均很多了,不會只集中在其中一個Task,Summary Metrics for Tasks中也變得比較平均,再來看看執行時間,Job 27是一般的join,Job 28是key salting join,執行時間分別是5.1 mins跟3.8 mins,差了1.34倍!
我們剛剛舉的例子中對df2
的key做了展開的動作,而這個展開的動作會增加原本的資料量,在我們的例子中設定n=50 (參數coarseness
),df2
就膨脹了50倍,n設定的越大,key在df1
的分布就越uniform,data skew的狀況也越會越緩和,但相對的,df2
的資料量也會膨脹越大,因此這個n的設定也非常重要,n過小沒辦法把將分布變得更uniform,n過大則會造成資料過度膨脹。
我們通常會選擇比較小的dataframe來做explode,讓資料膨脹的情況沒有那麼嚴重,不過如果其中一個dataframe足夠小到能夠broadcast,直接broadcast應該會是最快的作法,直接省去了shuffle的操作。
Spark Config
Spark config (Spark properties)控制了spark application的設定,而且每個application的config都是獨立且分開被設置的。
Spark config有非常多的參數可以設定,詳細可以參考官方的docs,這裡介紹一些我有用過&常見的config:
Core settings
spark.app.name
: application的名稱,會顯示在Spark UI上spark.master
: Cluster Manager的設定,可以是"local"
: run on local,或是"yarn"
,"mesos"
等Cluster Managerspark.submit.deployMode
: driver program的deploy方式,可以是"client"
or"cluster"
Driver
spark.driver.cores
: driver的core數,default=1spark.driver.maxResultSize
: 傳回driver (collect()
等action)的資料上限大小,default=1gspark.driver.memory
: drive的memory大小,default=1g
Executor
spark.executor.cores
: executors的core數,default=1spark.executor.memory
: executor的memory大小,default=1g
Netwwoking
spark.rpc.message.maxSize
: executors間傳遞資料大小的最大上限,default=128m,有時Spark會跳出超過這個max size,再調大即可(上限2047m)
Dynamic Allocation
Dynamic Allocation在多人使用的Cluster上還蠻重要的,有時候我們可能會先用Spark處理資料,處理完後可能就使用tensorflow或xgboost來訓練模型,這時候就用不到Cluster上的資源,而設置dynamic allocation能夠自動把資源釋放出來給需要的人使用。
spark.dynamicAllocation.enabled
: 是否啟用dynamic allocation,如果啟用,Spark會依照目前application的workload動態配置executors的數量,default=falsespark.dynamicAllocation.maxExecutors
: dynamic allocation下最多executor數的上限,default=infinity,所以有設置dynamic allocation一定要設置maxExecutors
這個參數呀! 如果是在一個多人共用的cluster上,沒有把這個參數設好,一下子就把整個叢集的資源吃光光,影響到其他同事的工作就不好了~
SQL
spark.sql.shuffle.partitions
: shuffle完後的partition數,default=200spark.sql.adaptive.enabled
: 是否開啟adaptive query,Spark會自動優化query plan
還有一些adaptive query的參數像是spark.sql.adaptive.skewJoin.enabled
, spark.sql.adaptive.skewJoin.skewedPartitionFactor
可以adaptive控制data skew的狀況,不過這兩個參數都是再Spark 3.0+才有的,我也沒有實際使用過,如果在使用的本版本是Spark 3以上,又有遇到Data Skew的問題,或許可以試看看調整這兩個參數~
以上是我一些工作上使用Spark的心得還有一些網路課程跟網絡資料的整理,如果有任何寫的不正確的地方,還請多多指正 :)