Apache Spark เบื้องต้น Part 1

หมายเหตุ ผู้อ่านสามารถดู table of contents ของ Data Engineering from Noob to Newbie ได้ที่ http://bit.ly/2P7isEw

สำหรับบทความนี้เราจะมาทำความรู้จัก 1 ใน tools ยอดฮิตในสายงานของ data engineer กันซึ่งนั้นก็คือ apache spark โดย spark เป็น tool สำหรับ distributed processing ซึ่งเป็นส่วนสำคัญมากใน big data เพราะโดยส่วนใหญ่นั้นพื้นฐานการทำ big data project คือการจัดเก็บข้อมูลแบบ distributed storage หรือการจัดเก็บข้อมูลแบบกระจายและ 1 ใน tools สำหรับงานด้านนี้ก็คือ apache hadoop ที่เราได้เคยกล่าวถึงไปแล้วในบทความก่อนหน้า

หลักการทำงานของ spark จะสามารถแบ่ง environment ออกได้เป็น 2 แบบใหญ่ๆคือ standalone mode และ cluster mode ในบทความนี้เราจะมาทำความคุ้นเคยกับ standalone mode กันก่อนเพื่อความง่ายในการเริ่มต้น โดยส่วนประกอบของกลไกการทำงานใน spark แบบ standalone mode จะประกอบไปด้วย 4 ส่วนคือ

spark main component (อ้างอิงจากหนังสือ Learning Spark โดย Matei Zaharia, Patrick Wendell, Andy Konwinski, Holden Karau)
  • Driver Program
  • Worker Node
  • Executor
  • Task

Driver Program และ Spark Context จะอยู่คู่กันเสมอเพราะ driver คือโปรแกรมที่ทำหน้าที่สร้าง spark context object ขึ้นมา และตัว spark context นี้มีหน้าที่แปลง code ของขั้นตอนการประมวลผลที่เราเขียนให้อยู่ในรูปแบบของ gragh และควบคุมการประมวลผลของแต่ละขั้นตอน หรือพูดอีกอย่างได้ว่า เมื่อเราเขียนโปรแกรม spark เสร็จแล้ว ตัว spark context จะทำหน้าที่ดูแลการทำงานของโปรแกรมนั้นตั้งแต่ต้นจนจบ นอกจาก driver จะทำหน้าที่สร้าง spark context แล้ว driver ยังทำหน้าที่ส่งผลลัพธ์ของการประมวลผลกลับมาให้กับเราในกรณีที่ต้องการอีกด้วย

Worker Node จะเป็นเสมือน container ที่เราจะใช้ในการประมวลผลใน spark ส่วน task นั้นคือ unit ที่ประมวลผลจริง กล่าวโดยสรุปคือเมื่อเราเขียนโปรแกรม spark เสร็จเรียบร้อยแล้วนั้นตัว spark context จะทำหน้าที่แตก code เราเป็นส่วนคำสั่งย่อยๆแล้วร้อยเรียงลำดับของคำสั่งดังกล่าวให้อยู่ในรูปแบบ gragh จากนั้นก็ส่งแต่ละคำสั่งย่อยไปประมวลผลใน worker node ซึ่งคำสั่งย่อยๆดังกล่าวก็คือ task นั้นเอง ตัวอย่างจากภาพด้านบนสมมุติว่า spark context ได้ดู code ของเราแล้วแตกชุดคำสั่งการทำการออกเป็น 8 tasks แล้วส่งแต่ละ task ไปประมวลผลยัง work node แต่จะเห็นได้ว่า worker node นั้นสามารถ run task ได้พร้อมกันสูงสุดอยู่ที่ 2 task ต่อ 1 worker node ทำให้อีก 4 tasks ที่เหลือต้องรอการประมวลต่อทีหลังซึ่งการจัดคิวการประมวลผลนี้เป็นหน้าที่ของ spark context เมื่อมี task ใดเสร็จก็จะส่ง status กลับไปยัง spark context จากนั้น spark context ก็จะส่ง task ที่รอคิวอยู่ไปประมวลผลยัง worker node ทำแบบนี้จนกว่าทุก task จะเสร็จเรียบร้อย

รูปแบบการเขียนโปรแกรมใน spark โดยส่วนตัวผมจะแบ่งออกเป็น 4 step คือ

4 steps หลักในการเขียน spark

Load: ทำการดึงข้อมูลที่ต้องการประมวลผลเข้ามายังใน spark environment

Transform: เป็นการแปลงข้อมูลจาก format แบบนึงไปเป็นอีก format นึงโดยหลักการทำ transform นี้เพื่อจัดเตรียม format ให้พร้อมที่จะเข้าสู่กระบวนการ aggregate

Aggregate: กระบวนการในคำนวนข้อมูลในรูปแบบต่างๆ สมมุติว่าเรามีข้อมูลอยู่ 3 ตัว a#1, b#1, a#1 หลังจากเข้ากระบวนการ sum aggregate แล้วลผลลัพธ์จะได้ a#2, b#1 โดยผลลัพที่ได้จาก aggregate สามารถนำเข้าสู่ Transform อีกรอบเพื่อคำนวนในมิติอื่นต่อไป

Output: เมื่อได้ผลลัพธ์ของการ aggregate ที่ต้องการแล้วก็เข้าสู่กระบวนการสุดท้ายนั้นก็คือ output เพื่อสั่งงานให้ spark รู้ว่า output ของระบบนี้เราต้องการให้ออกมาในรูปแบบใดเช่น เขียนข้อมูลลง hadoop หรือ database เป็นต้น

ทีนี้เรามาลงมือเขียนโปรแกรมบน spark กันนะครับ โดยเริ่มจาก start hadoop container ใน docker ที่เราสร้างไว้เมื่อบนความที่แล้วกันก่อน ขั้นแรกใช้คำสั่ง docker ps -a เพื่อดูว่าตอนนี้ container ของเราอยู่ใน status ใด ถ้า container ของเราอยู่ใน status Exit ตามรูปด้านล่างก็ให้เราทำการ start มันขึ้นโดยใช้คำสั่ง docker start <container_id> เมื่อ container เรา start แล้วให้ใช้คำสั่ง docker exec -it <container_id> bash เพื่อเข้าใช้งาน container ของเรา

start docker container

หลังจากเข้ามาใน container แล้วให้ทำการเปลี่ยน user เป็น hduser จากนั้นทำการ start hadoop ขึ้นมาเพื่อใช้งานโดยใช้คำสั่ง start-dfs.sh และ start-yarn.sh ตามลำดับ เมื่อ hadoop start เรียบร้อยแล้วให้เราทำการตรวจสอบโดยการเปิด web browser ของ computer เราขึ้นมาแล้วเข้าไปยัง URL: http://localhost:8088/cluster ซึ่งถ้า hadoop เราพร้อมใช้งานจะขึ้นเป็น web ดังรูปด้านล่าง

hadoop monitor

ทีนี้กลับมายัง hadoop container ของเรา โดยในบทความนี้จะเป็นการเขียนโปรแกรม spark ผ่านทาง spark shell ซึ่ง core มันจะบน Scala REPL อีกทีนึงทำให้ interface มันจะเหมือนกับตอนเราเรียนใช้งาน Scala REPL

spark shell interface

ดังที่กล่าวไปแล้วว่า step แรกของการเขียนโปรแกรมบน spark นั้นจะเริ่มจากขั้นตอนการ load ซึ่งโดยทั่วไปใน step นี้เราจะทำการ load ข้อมูลจาก file เข้ามายัง spark environment แต่ในบทความนี้เราจะสร้างข้อมูลเล็กๆขึ้นมาจากนั้น load เข้าสู่ spark spark environment เริ่มจากสร้าง list ของ string เพื่อจำลองข้อมูลที่เราจะทำการประมวลผล ซึ่งในที่นี้เราจะสมมุติว่าเรากำลังจะประมวลผล log การเข้าใช้งาน website โดย string 1 จะแทน 1 transaction ของ user ที่ใช้งาน website มี format ดังนี้ <user_id>, <page_location>

//step 1.1: init data in scala string list form
val data_log = List(
"1,/product/id_1.html",
"1,/product/id_2.html",
"1,/product/id_3.html",
"2,/product/id_2.html",
"3,/product/id_1.html",
"4,/product/id_1.html")
//step 1.2: convert scala list into rdd
val data_rdd = sc.parallelize(
data_log)

จาก code ด้านบน step แรกจะเป็นการสร้าง data โดยจัดเก็บในรูปแบบของ string list ในภาษา Scala จากนั้นทำการแปลง string list ให้อยู่ในรูปแบบของ rdd partition โดยเรามอง partition ใน spark เป็นเหมือน collection แบบนึงในภาษาโปรแกรมทั่วไปเช่น List หรือ Array ซึ่งเวลา data จะถูกส่งไปประมวลผลที่ executor spark จะส่งข้อมูลแบบเป็น partition ไปยังแต่ละ executor เพื่อประมวลผล โดยใน partition 1 ตัวก็จะจัดเก็บ rdd ได้หลายตัวและ rdd 1 ตัวก็จะจัดเก็บ 1 object ของข้อมูล user ซึ่งตัว rdd นี้เป็น data format พื้นฐานของ spark ไม่ว่าเราจะประมวลผลใดๆใน spark ข้อมูลแต่ละชิ้นต้องถูกจัดเก็บใน rdd object เสมอ ในขั้นตอนนี้ถ้าเราจะดูว่า data_rdd ถูกแบ่งข้อมูลออกเป็นกี่ partition ให้ใช้คำสั่ง data_rdd.partitions.size

ดูจำนวน partitions ของข้อมูล

ใน step ที่ 1.2 นี้จะสังเกตเห็นว่ามีการเรียกใช้งาน object ที่ชื่อว่า sc ซึ่งก็คือ spark context นั้นเอง โดยที่ถ้าเราเขียนโปรแกรม spark ผ่านทาง spark shell ตัว shell จะทำการสร้าง spark context ให้เอาอัตโนมัติ จากตรงนี้เราก็จะพร้อมในการเข้าสู่ step ต่อไปนั้นคือการ Transform

หลักการส่วนตัวของการเขียน spark ในขั้นตอนการ transform ผมจะเริ่มจากการตั้งคำถามก่อนว่าใน spark application นี้เราต้องการทำอะไรแล้วใน step aggregate เราต้องการประมวลผลข้อมูลรูปแบบใด สมมุติในบทความนี้เราต้องการหาว่าแต่ละ page ใน website เรามีคนเข้ากี่คน จากตรงนี้เราก็จะเห็นแล้วว่าสิ่งที่เราสนใจคือข้อมูล page_location ของแต่ละ user โดยเราจะเห็นแล้วว่าในตอนการ aggregate นั้นเราต้องการทำการคำนวน sum ของแต่ละ page_location จากตรงนี้เราก็จะทำการแปลง rdd จากรูปแบบ <user_id>, <page_location> ให้เป็น <page_location>, 1 ซึ่งเราจะจัดเก็บข้อมูลดังกล่าวให้อยู่ในรูปแบบ key value เพื่อใช้ในการประมวลผลโดย key ในที่นี้คือ page_location และ value เป็น 1

//step 2.1: map function
val keyvalue_rdd = data_rdd.map{rdd =>
val data_spilt = rdd.split(",")
val page = data_spilt(1)
(page,1)
}

map function จะรับ function เข้ามาเป็น input ซึ่งจาก code ด้านบนเราทำการเขียน lambda function แล้วนำไปเป็น input ของ map function อีกที ซึ่ง function ต่างๆที่เราเขียนใน Scala ถ้ามีการวางตัวแปรไว้ในบรรทัดสุดท้าย scala จะ return ตัวแปรนั้นเป็น output ของ function ทันที โดยหลังจากที่เราแปลง scala list เป็น rdd แล้ว จะทำให้เราเรียกใช้งาน function ต่างของ spark ที่ใช้ในการประมวลผลข้อมูลได้ และในการ transform ข้อมูล function พื้นฐานที่เราจะใช้คือ map function ซึ่ง map function จะทำหน้าที่คล้าย for loop ใน collection ของภาษาโปรแกรมทั่วไป โดยเราจะทำการ loop อ่าน rdd ทุกตัวที่อยู่ใน partition ถึงตรงนี้ลองจินตนาการว่า map function ที่เราเขียนจะถูกนำไปติดตั้งยัง spark executor และแต่ละ executor ก็จะทำการดึง partition มาประมวลผลทีละตัว

ตัวอย่างโครงสร้างการทำงานของ map function

step ต่อมาคือการ aggregate โดยใน spark นั้น aggregate function ที่สามารถพบได้บ่อยคือตระกูลที่ขึ้นต้นด้วยคำว่า reduce ซึ่งในบทความนี้เราจะเรียกใช้งาน function ที่ชื่อว่า reduceByKey เนื่องจากใน map function เราได้แปลงข้อมูลให้อยู่ในรูปแบบ key value แล้วใน aggregate นี้เราจะทำการ sum value ของแต่ละ key กัน

//step 3.1: reduce function
val aggregate_rdd = keyvalue_rdd.reduceByKey{(a, b) =>
a + b
}

จาก code ด้านบน spark จะทำการ group รวม key ที่เหมือนกันเข้าด้วยกันโดย a และ b จะเป็น value ของแต่ละ rdd ที่มี key เหมือนกัน จากนั้นนำ value มาบวกรวมกันไปเลยๆของแต่ละ key จนครบทุก rdd ที่มี key เดียวกัน โดยแต่ a จะเป็น accumulate value คือตัวแปรที่เป็นค่าสะสมจากการประมวลผลรอบก่อนหน้า และ b จะเป็นค่าของ rdd ใหม่ที่ถูกอ่านเข้ามาประมวลผล

แต่ละรอบของการประมวลผล reduceByKey

เมื่อประมวลผล step aggregate เรียบร้อยแล้วและข้อมูลตอนนี้อยู่ในรูปแบบที่เราต้องการซึ่งสอดคล้องกับจุดประสงค์ของ spark application นี้ที่ว่าเราต้องการดูข้อมูลว่าแต่ละ page มีคนเข้ากี่คนซึ่งผลลัพธ์จะอยู่ในรูปแบบ key value ที่เราจัดไว้ นั้นก็คือ key=page location และ value=total of visit เราก็พร้อมเข้าสู่ step สุดท้ายนั้นก็คือ output โดยในบทความนี้เราจะให้ output ถูกแสดงผลออกมาที่หน้าจอ ซึ่งใน spark function ที่ใช้ในการแสดงผลข้อมูลใน rdd ทั้งหมดคือ collect() ซึ่ง spark จะทำการส่งข้อมูลของแต่ละ rdd กลับมายัง driver program เพื่อแสดงผลออกทางหน้าจอ จากรูปด้านล่างจะเห็นว่า page /product/id_1.html มีการเข้าใช้งานมากที่สุดคือ 3 ครั้ง

แสดงผลข้อมูลใน rdd

หวังว่าผู้อ่านจะพอเห็นภาพเบื้องต้นของการเคลื่ยนย้ายข้อมูลและการประมวลผลข้อมูลภายใน spark standalone mode กันแล้วนะครับ ในบทความหน้าเราจะไปทดลองประมวลผลข้อมูลโดยใช้ spark cluster mode ซึ่งเป็น mode ที่ใช้สำหรับการทำงานจริงของ data engineer กันครับ

สำหรับคนที่สนใจ Data Engineer สามารถเข้ามาแชร์ข้อมูลกันที่ได้ที่ https://www.facebook.com/groups/369157183664760/

สุดท้ายนี้ถ้าท่านใดมีข้อสงสัยหรือคำชี้แนะใดๆสามารถฝากข้อความได้ที่ https://www.facebook.com/coeffest/ นะครับ ขอบคุณมากครับที่ติดตาม

อ้างอิง

https://www.oreilly.com/library/view/learning-spark/9781449359034/

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-SparkContext.html