Hadoop Map Reduce จัดการอย่างไรกับ Record ที่ขาดครึ่งระหว่าง Block

เมื่อวันเสาร์ที่ผ่านมาพอดีผมได้มีโอกาสไปนั่งเรียน Hadoop กับ อ.กิตติรักษ์ ม่วงมิ่งสุข ที่ สถาบันบัณฑิตพัฒนบริหารศาสตร์ หรือนิด้า ซึ่งก่อนหน้านี้พอมีพื้นฐานเกี่ยวกับ Hadoop อยู่บ้างเล็กน้อย ก็เลยนั่งฟังไปแบบสบาย ๆ แต่พอกลับมาถึงบ้านเท่านั้นแหละ! เกิดข้อสงสัยขึ้นมาทันที…

ถ้าหาก HDFS มันหั่นไฟล์ดื้อ ๆ แบบนั้น กระจายไปยัง Block ใน Data node ต่าง ๆ แล้วพวกไฟล์ CSV ก็มีโอกาสสูงมาก ๆ เลยสิ ที่จะโดนหั่นครึ่งกลาง Record เวลาจะทำ Map Reduce หรือใช้ hive query เจ้าตัว Mapper ของแต่ละ Block มันจะจัดการประมวลผลได้อย่างถูกต้องยังไงล่ะทีนี้

อย่างที่ทราบกันดีว่า Hadoop เป็น distributed file system (เหมือนทำ Raid กันระหว่างแต่ละ Node ผ่าน Network มองเป็น Disk ผืนเดียว โดยจะมี Name Node เป็น Manager ที่ Handle Node ทั้งหมด) เกิดขึ้นมาเพื่อเก็บ และประมวลผลข้อมูลที่มีขนาดใหญ่มาก ๆ ที่ compute node เครื่องเดียวไม่สามารถประมวลผลข้อมูลได้อีกต่อไปแล้ว

รูปภาพจาก http://saphanatutorial.com/hadoop-online-training-hadoop-basics-5-2/

หลักการทำงานของมันก็ตามรูปด้านบนเลย ไฟล์ data ขนาดใหญ่ จะถูกส่งเข้ามาทาง Name Node และ Name Node ก็จะทำการหั่นไฟล์ออกเป็นชิ้น ๆ กระจายไปยัง Block ต่าง ๆ ใน Data Node หลาย ๆ เครื่อง โดยวิธีการหั่นไฟล์ ตามที่ อ.กิตติรักษ์ บอกก็คือ มันหั่นตรง ๆ ตาม Block Size ที่ config เอาไว้… (ตอนอาจารย์แกสอน มีการโชว์เอา jdk ขนาดที่เกินจาก Block Size ยัดเข้าไปที่ HDFS แล้วเข้าไปที่ data node เช็ค size ของชิ้นส่วนไฟล์ที่ถูกหั่นว่าตรงตาม Block Size จริง ๆ และนำมันมาประกอบร่างกัน (cat) และสามารถนำไปใช้ได้ปกติ)

ด้วยความที่ยังไม่มั่นใจว่า HDFS จะหั่นไฟล์ตรง ๆ ในกรณีที่เป็น Text file ด้วยหรือไม่ เลยไปหาข้อมูลเพิ่มเติมและไปเจอ blog ของผู้เขียนหนังสือ Hadoop For Dumies ซึ่งสรุปได้ว่า

เมื่อมีการเก็บไฟล์เข้าไปใน HDFS ตัว HDFS จะทำการแบ่งไฟล์ตามขนาด Block size ที่ได้กำหนดเอาไว้ และ กระจายไปที่ Data node โดยไม่สนใจว่าจะเป็นไฟล์ประเภทไหน ในกรณีที่เป็น Text file ก็ไม่สนด้วยว่า Record เริ่มที่ตำแหน่งไหน และสิ้นสุดที่ตำแหน่งใด
รูปภาพจาก http://www.dummies.com/programming/big-data/hadoop/data-blocks-in-the-hadoop-distributed-file-system-hdfs/

จากรูปด้านบน data แต่ละ block (a, b, c, d และ e) เป็นไปได้สูงมากที่จะถูกหั่นระหว่าง Record ดังนั้น มันต้องมีการจัดการตอนก่อนการ Map แน่ ๆ เพื่อที่จะทำให้ข้อมูลไม่มั่ว ไม่ตกหล่นตอนประมวลผล

จากข้อสันนิษฐานข้างต้น ถ้าหากสมมุติว่ามีข้อมูลทั้งหมด 6 Record แต่ละ Record ขนาด 50MB, Record ทั้งหมด จะถูกเก็บใน HDFS ดังในรูปข้างล่าง

ข้อสงสัยที่มีตอนนี้ก็คือ

  1. Hadoop มันมั่นใจได้อย่างไร ว่ามันสามารถอ่านและประมวลผลข้อมูลได้อย่างถูกต้องทุก Record
  2. ใน Record ที่ 3 (สีแดง) Record อยู่ระหว่าง Block ที่ 1 และ Block ที่ 2 เจ้า Hadoop มันมีวิธีจัดการประมวลผลยังไง

เพื่อไม่ให้เสียเวลา มาหาคำตอบกันต่อเลยดีกว่าครับ

Hadoop มันรู้ได้อย่างไรว่า จุดสิ้นสุดของ Record อยู่ตรงไหน

ก่อนจะไปต่อ เข้าใจตรงกันก่อนว่า ข้อมูลทุกอย่างที่เก็บใน Disk นั้นล้วนเป็น Binary ทั้งสิ้น เวลาที่คอมพิวเตอร์อ่านข้อมูลจากไฟล์ใน Disk มาประมวลผล นั้นจะเป็น Binary ล้วน ๆ ดังนั้น การที่คอมพิวเตอร์จะรู้ได้ว่า Binary แต่ละชุด หมายถึงอะไร เช่น ตัวอักษร, ช่องว่าง, ย่อหน้า รวมไปถึงการ เว้นบันทัด จึงจำเป็นต้องมีมาตรฐานกำหนด หรือที่เรียกกันว่า encoding

เนื่องด้วยระบบคอมพิวเตอร์ในปัจจุบันจะ based on ASCII แทบทั้งหมดแล้ว ดังนั้น สัญลักษณ์ที่ทำให้คอมพิวเตอร์รู้ได้ว่าคือการเว้นบันทัดนั้น จะเหลืออยู่ 3 ตัวหลัก ๆ ดังนี้ (อ้างอิง)

  1. 0A(hex) = \n = LF (Line Feed): ใช้ใน Unix, Mac OS X, Linux
  2. 0D(hex) = \r = CR (Carriage Return): ใช้ในตระกูล Mac ก่อน Mac OS X
  3. 0D0A(hex) = \r\n = CR + LF: ใช้ใน MS-DOS และ Windows

เพื่อให้แน่ใจว่า Hadoop ใช้สัญลักษณ์พวกนี้ในการแยกแยะบันทัดจริงรึป่าว เราคงต้องเปิด source ดูครับ

ตามมาที่ Class LineReader.java ครับ

เมื่อเข้ามาจะเห็น source code ส่วนนี้ครับ

source code บางส่วนของ Class LineReader.java

แค่ comment ด้านบนก็อธิบายไว้อย่างชัดเจนครับ สัญลักษณ์ที่บอกถึงจุดสิ้นสุดของบันทัดก็คือ \n \r หรือ \r\n และตรงส่วนนี้เรายังสามารถปรับแต่งไปใช้ สัญลักษณ์ตัวอื่นได้ ในกรณีที่ไฟล์นั้นมาจาก system อื่น ที่ไม่ได้ based on ASCII

สังเกตุว่าในบันทัดที่ 53 และ 54 มีการประกาศ byte ของ \r และ \n ไว้สำหรับเอาไปใช้ในแต่ละ method อีกด้วย

สำหรับคนที่อยากรู้อยากเห็น ว่าเขา implement ยังไง เข้าไปดูต่อได้ที่ method readDefaultLine นะครับ

วิธีการทำงานของ Class Map

ถ้าเคยเขียน Map reduce มาบ้าง จะทราบว่าโดย default Mapper จะส่งข้อมูลมาให้เรา Map ทีละ Record (สามารถ config ให้ Mapper ส่งมาให้เราทีละกี่ Record ก็ได้ ในกรณีที่ต้องประมวลผลหลาย Record ร่วมกัน โดยใช้ NLineInputFormat)

โดยจะส่งเข้ามาเป็น key / value pair โดย key จะเป็นตัวเลข unique ส่วน value จะเป็น Record นั่นเอง

รูปภาพจาก https://wiki.apache.org/hadoop/WordCount

จากตัวอย่าง code ด้านบน บันทัดที่ 22 เป็นการรับ Record มาจาก Mapper เพื่อนำมาประมวลผล

นั่นแปลว่าก่อนที่จะได้ข้อมูลมาประมวลผลทีละ Record แบบนี้ Mapper จะต้อง เข้าไปอ่านไฟล์ และ Split ออกมาทีละ Record โดยจะต้อง handle ในกรณีที่ Record อยู่คนละ Block ด้วย แล้ว Hadoop จะมีวิธีจัดการตรงนี้ยังไงล่ะ?

Input Splits, Concept สำคัญ ที่ใช้จัดการกับปัญหานี้

เมื่อ Map reduce job เริ่มต้นขึ้น Resource Manager จะสร้าง daemon process ชื่อ Application Master เพื่อคอยดูแลการทำงานของ job นั้น ๆ

หนึ่งในสิ่งแรกที่ Application Master ทำก็คือ การกำหนดว่ามี Block ไหนบ้างที่จะต้องประมวลผล โดยตัว Application Master จะขอรายละเอียดจาก Name node ว่ามี Data node ตัวไหนบ้าง ที่เก็บ Block พวกนั้นอยู่ จากนั้น Application Master ก็จะส่ง request ไปให้ Resource Manager เพื่อที่จะส่ง map tasks process ไปยัง Block ต่าง ๆ บน Data node นั้น ๆ

กุญแจสำคัญที่จะทำให้ Map reduce job ประมวลผลได้อย่างรวดเร็ว ก็คือ Block ที่ต้องการจะประมวลผลอยู่ที่ Data node ไหน ก็จะประมวลผลที่ Data node นั้น ๆ (การ communicate ระหว่าง Data node อื่น ๆ จะมี overhead พอสมควร)

ถ้าย้อนกลับขึ้นไปดูวิธีการเก็บข้อมูลแต่ละ Block ของ HDFS จะเห็นว่ามีการหั่นไฟล์ตรง ๆ ตาม Block size ที่กำหนดเอาไว้ โดยไม่สนใจเรื่องจุดเริ่มต้นและสิ้นสุดของแต่ละ Record แต่เวลา Mapper ประมวลผลแต่ละ task นั้น จะมีการประมวลผลทีละ Record

ปัญหาที่เกิดขึ้นก็คือ ถ้าหาก Map task ประมวลผลทุก Record ที่มีใน Data block นั้น ๆ จะเกิดอะไรขึ้นกับ Record ที่ขาดครึ่ง (ครึ่งแรกของ Record อยู่ใน Block นึง และครึ่งหลัง Record ถูกเก็บอยู่ใน Block ถัดไป)

วิธีที่ Hadoop ใช้ในการจัดการกับปัญหานี้ก็คือ Concept ของ Logical Block หรือที่เรียกว่า Input splits นั่นเอง

รูปภาพจาก http://www.dummies.com/programming/big-data/hadoop/input-splits-in-hadoops-mapreduce/

ก่อนที่ Map reduce job task จะเริ่มต้นประมวลผลได้ Application Master จะเรียกใช้งาน Class InputFormat เพื่อสร้าง Input splits ขึ้นมาก่อน โดยจะคำนวณหาว่า จุดเริ่มต้นของ Record แรก และจุดสิ้นสุดของ Record สุดท้าย ที่จะประมวลผลอยู่ตำแหน่งไหน

source code บางส่วนจาก github

รูปด้านบนคือ source code ของ method getSplits ของ Class NLineInputFormat ที่ใช้ในการสร้าง Input splits

ในกรณีที่ Record สุดท้ายไม่สมบูรณ์ Input split จะเพิ่มข้อมูลเกี่ยวกับตำแหน่งของ Block ถัดไป พร้อมกับจำนวน byte ที่จะต้องไปอ่านเพิ่ม เพื่อให้ Record นั้นสมบูรณ์

รูปภาพจาก http://www.polarsparc.com/xhtml/Hadoop-6.html
Input split ไม่ได้เก็บข้อมูลจริง ๆ แต่จะเก็บเพียงแค่ตำแหน่งที่อยู่ของข้อมูลและขนาดของ Byte เท่านั้น เพื่อให้ RecordReader ใช้อ่านทีละบันทัด ส่งต่อไปให้ Mapperนำไปประมวลผล ดังรูปด้านบน
source code บางส่วนจาก github

source code ด้านบนเป็นส่วนของ interface ของ InputSplit ซึ่งแสดงให้เห็นว่าเก็บเพียงแค่ตำแหน่งที่อยู่ของข้อมูล (location) และขนาดของ Byte (length) เท่านั้น

อ้างอิงบางส่วนจาก http://www.dummies.com/programming/big-data/hadoop/input-splits-in-hadoops-mapreduce/

สรุปการทำงานของ InputFormat, InputSplit, RecordReader

  1. InputFormat: จะเรียกใช้โดย Application Master เพื่อตรวจสอบ input directory บน HDFS และใช้สร้าง Input Split object list โดยวิธีการและเงื่อนไขในการคำนวณและสร้าง Input Splits จะขึ้นอยู่กับประเภทของ input file ที่จะประมวลผล
  2. InputSplit: เป็นที่เก็บตำแหน่ง และขนาดของ Logical Block ของทุก ๆ Data node ใน cluster ที่จะมีการประมวลผล โดยจะถูกเรียกใช้งานโดย RecordReader
  3. RecordReader: จะถูกใช้งานโดย Mapper task โดย RecordReader จะใช้ข้อมูลตำแหน่ง และขนาดข้อมูลจาก InputSplit เพื่ออ่านข้อมูล และส่งข้อมูลไปให้ map() function ของ Mapper โดย default implementation ของ RecordReader ก็คือ LineRecordReader โดยที่ LineRecordReader จะส่งข้อมูลทีละ Record ไปให้กับ Mapper นั่นเอง
สำหรับใครที่อยากรู้วิธีการจัดการข้อมูลใน HDFS ที่ถูก Split ใน กรณีต่าง ๆ เพิ่มเติม เช่น กรณีที่ จุดสิ้นสุดของ Block ถูกแบ่งตรงกลางของ Record delimiter (CR, LF)ว่า LineReader จะจัดการยังไง สามารถเข้าไปดูคำอธิบายใน CompressedSplitLineReader.java ได้ครับ อธิบายเอาไว้ค่อนข้างละเอียดทีเดียว