Implementasi Sederhana Framework MapReduce

Imre Nagi
Pujangga Teknologi
Published in
7 min readApr 24, 2019

MapReduce mungkin bukan terminologi yang asing di telinga para penggiat teknologi, terutama di bidang data dan analitik. Istilah MapReduce dipopulerkan pertama kali melalui publikasi ilmiah yang ditulis oleh Google pada tahun 2004. Kemudian pada tahun 2006, Doug Cutting mencoba untuk membuat implementasi dari tulisan ilmiah tersebut dan pada akhirnya mempublikasikan projeknya yang kemudian dikenal dengan nama Apache Hadoop.

Silakan tonton video ini jika berminat untuk mendengarkan penjelasan lisan dari tulisan ini.

MapReduce

Secara definisi, MapReduce adalah sebuah model pemograman yang didesain untuk dapat melakukan pemrosesan data dengan jumlah yang sangat besar dengan cara membagi pemrosesan tersebut ke beberapa tugas yang indipenden satu sama lain.

Gambar 1: Alur MapReduce Untuk Kasus WordCount. Sumber: https://www.todaysoftmag.com/images/articles/tsm33/large/a11.png

Berdasarkan definisi diatas dan Gambar 1, MapReduce dapat dibagi menjadi beberapa tahap:

  1. Pemecahan data masukan (Splitting). Pada proses ini data masukan yang diberikan oleh pengguna MapReduce (klien) akan dipecah menjadi bagian-bagian yang lebih kecil. Pada kasus Hadoop MapReduce, dalam kondisi ideal, data akan dipecah menjadi beberapa bongkahan berukuran maksimal 128MB.
  2. Mapping. Mapping adalah salah satu tahap terpenting dari MapReduce. Pada fase Mapping, bongkahan data yang telah dipecah akan di proses untuk menghasilkan intermediary key-value pairs. Pada contoh wordcount (Gambar 1) diatas, data yang mengandung “Dear Bear River” akan diproses sehingga menghasilkan pasangan key-value Dear:1, Bear:1, dan River:1. Dalam fase Mapping, bisa jadi ada satu atau lebih mesin pekerja (worker) yang melakukan proses terhadap beberapa bongkahan data yang berbeda. Semakin banyak jumlah mesin atau tingkatan parallelisme yang digunakan, maka durasi pemrosesan seluruh data dapat berjalan jauh lebih cepat.
  3. Pengacakan atau Shuffling. Fase mapping bisa berjalan di satu atau banyak mesin. Akibatnya, pasangan key-value yang dihasilkan oleh sebuah mapper bisa tersebar di berbagai mesin. Namun, jika pengolahan yang ingin dilakukan adalah perhitungan dengan menggunakan key yang sama, maka data dengan key yang sama harus berada pada mesin yang sama pada fase reduce. Oleh karena itu, sebelum fase reduce, fase shuffling bertugas untuk mengumpulkan satu atau lebih key yang berbeda disebuah mesin tertentu agar aggregasi dapat dilakukan dengan mudah. Pada contoh diatas, seluruh kata Bear yang dihasilkan fase mapping akan berada dalam sebuah mesin yang sama. Begitu juga dengan kata-kata lain.
  4. Reducing. Fase reducing bertugas untuk melakukan aggregasi terhadap seluruh pasangan intermediary key-value dengan key yang sama. Pada gambar diatas, pasangan key-value Bear:1 dan Bear:1 akan diaggregasi oleh reducer sehingga pada akhirnya reducer akan menghasilkan keluaran Bear:2 seperti pada contoh kasus wordcount.

Penggunaan MapReduce

Pada umumnya, pengembang aplikasi MapReduce akan membutuhkan beberapa hal berikut dalam melakukan analisis data.

  1. Berkas masukan. Berkas masukan ini dapat berupa berkas-berkas teks yang tersimpan di dalam sebuah media penyimpanan terdistribusi seperti Google File System (GFS), Hadoop File System(HDFS), AWS S3, Google Cloud Storage, dan lain-lain.
  2. Fungsi Map & Reduce. Untuk membuat sebuah aplikasi MapReduce yang dapat dieksekusi secara paralel (misalkan dengan Hadoop MapReduce), pengembang aplikasi menyediakan fungsi khusus yang digunakan untuk melakukan pemrosesan pada fase map dan reduce. Seluruh hal yang berkaitan dengan penjadwalan, mekanisme penanganan eror, dll. akan dilakukan oleh MapReduce framework yang digunakan.

Berikut beberapa contoh fungsi MapReduce yang dapat dibuat oleh pengembang piranti lunak:

  • Word Count
  • Inverted Index, dll.

Membuat fungsi map dan reduce bukanlah hal yang sulit. Namun, bagaimana jika kita diminta untuk membuat framework MapReduce yang digunakan untuk mengendalikan seluruh proses yang terjadi setelah klien memberikan data masukan serta fungsi map dan reduce?

Implementasi Framework MapReduce Sederhana

Sebelum memulai implementasi, untuk membatasi cakupan dari implementasi ini, maka requirement dari framework MapReduce ini adalah sebagai berikut:

  1. Implementasi dilakukan dengan menggunakan Golang.
  2. Mesin MapReduce dapat menerima lokasi berkas masukan di sebuah file system serta definisi fungsi map dan reduce yang diberikan oleh klien.
  3. Mesin pekerja dapat melakukan registrasi kepada mesin master agar dapat digunakan ketika klien memulai sebuah tugas MapReduce.
  4. Mesin master dapat menugaskan eksekusi dari fungsi map dan reduce kepada mesin pekerja yang telah terdaftar. Apabila terjadi kesalahan dalam pemrosesan karena eror tertentu (seperti error pada komputasi, atau mesin pekerja tiba-tiba mati), maka mesin master harus dapat mengalokasikan kembali pekerjaan tersebut kepada mesin pekerja lain yang tersedia.
  5. Dapat melakukan perhitungan jumlah kata (word count) dengan menggunakan API framework MapReduce.

Diluar cakupan:

  1. Penggunaan file system terdistribusi.
  2. Jumlah maksimal pengulangan ketika sebuah tugas gagal dijalankan oleh mesin pekerja. Dan lain-lain. 🙏
Gambar 2: Diagram Sistem Framework MapReduce Sederhana. Oleh: Imre Nagi

Berdasarkan gambar 2, maka ada beberapa komponen utama yang akan diimplementasikan:

Scheduler

Scheduler akan menugaskan mesin pekerja untuk melakukan operasi map atau reduce. Apabila seluruh mesin pekerja sedang melakukan operasi map atau reduce, maka scheduler akan menunggu hingga ada mesin pekerja yang idle. Jika terjadi kegagalan pada operasi map atau reduce, scheduler akan berusaha untuk menugaskan mesin pekerja lain untuk melakukan komputasi tersebut.

Berikut adalah antarmuka dari fungsi scheduler.

func schedule(
jobName string,
mapFiles []string, //slice of input files
nReduce int, //number of reduce task
phase jobPhase, //job phase: map or reduce
registerChan chan string) //channel of registered workers

Scheduler menerima beberapa parameter masukan sebagai berikut:

  1. jobName, merupakan nama dari aplikasi MapReduce yang akan dieksekusi.
  2. mapFiles, adalah kumpulan berkas-berkas yang akan di proses oleh aplikasi MapReduce.
  3. nReduce, adalah jumlah reducer yang dimiliki dalam sebuah task MapReduce.
  4. phase, menentukan tipe tugas yang akan dieksekusi oleh mesin pekerja. Masukan untuk parameter ini hanyalah mapPhase atau reducePhase
  5. registerChan, merupakan sebuah go channel yang menyimpan alamat RPC dari mesin pekerja yang dapat digunakan untuk mengeksekusi sebuah task map atau reduce.
scheduler.go

Map Controller — doMap()

Map Controller / fungsi doMap() bertugas untuk mengelola satu tugas map. Fungsi ini akan menggunakan fungsi mapper yg didefinisikan oleh klien ( mapF ) untuk melakukan pemrosesan data dan mempartisi keluaran dari fungsi mapF menjadi beberapa berkasi terpartisi yang selanjutnya akan digunakan pada fase reduce.

Berikut adalah definisi antarmuka dari fungsi doMap() :

func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string, // a input file which will be processed
nReduce int, // num of reduce task for hashing
mapF func(filename string, contents string) []KeyValue)

doMap() memiliki parameter masukan sebagai berikut:

  1. jobName, merupakan nama dari aplikasi MapReduce yang akan dieksekusi.
  2. mapTask, menyimpan nomor urut dari tugas map yang sedang dieksekusi oleh fungsidoMap() .
  3. inFile, merupakan lokasi berkas yang akan diproses dengan menggunakan mapF yang didefinisikan oleh klien.
  4. nReduce, merupakan jumlah reducer yang akan digunakan untuk menentukan berapa banyak partisi yang akan dihasilkan ketika doMap() telah dieksekusi.
  5. mapF, merupakan sebuah fungsi yang didefinisikan oleh klien. Fungsi ini menerima nama berkas dan konten dari berkas dan menghasilkan keluaran berupa kumpulan key-value pair yang selanjutnya akan diproses oleh fungsidoReduce() .

Berikut implementasi dari doMap() :

Implementasi fungsi doMap()

Pada implementasi diatas, doMap() akan mengekstraksi konten dari setiap berkas yang diproses untuk digunakan oleh fungsi mapF . Lalu, hasil pemrosesan akan ditulis kembali ke beberapa berkas keluaran yang selanjutnya akan digunakan oleh fungsi doReduce() . Perhatikan juga bahwa pada fase ini, proses pengacakan/shuffling juga dilakukan dengan menempatkan pasangan key-value ke berkas yang telah terpartisi agar proses dapat dilakukan pada satu atau beberapa berkas yang menyimpan key yang sama.

Reduce Controller — doReduce()

Berikut antarmuka dari fungsidoReduce() :

func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run
reduceF func(key string, values []string) string)

Reduce Controller / fungsidoReduce() digunakan untuk mengelola sebuah tugas reduce. Fungsi ini akan membaca seluruh berkas intermediary yang dihasilkan oleh sejumlah nMap mapper dan ditujukan untuk reducer tertentu ( reduceTask ).

Berikut adalah penjelasan parameter masukan dari fungsi doReduce() :

  1. jobName, merupakan nama dari aplikasi MapReduce yang akan dieksekusi.
  2. reduceTask, menyimpan nomor urut dari tugas reduce yang sedang dieksekusi oleh fungsidoReduce()
  3. outFile, merupakan nama berkas yang digunakan untuk menulis keluaran dari reduceF
  4. nMap, merupakan jumlah tugas map yang telah dijalankan pada fase mapping.
  5. reduceF, merupakan fungsi yang didefinisikan oleh klien dan digunakan untuk melakukan aggregasi terhadap seluruh value yang dimiliki oleh sebuah key.
Implementasi fungsi doReduce()

Pada kode diatas, doReduce() akan membaca seluruh berkas intermediary yang dihasilkan oleh doMap() . Untuk setiap berkas, doReduce() mengelompokkan seluruh value yang memiliki key yang sama dalam sebuah data struktur map. Pada akhirnya fungsi reduceF dipanggil agar diperoleh hasil aggregat.

MapF & ReduceF untuk WordCount

Contoh mapF dan reduceF untuk kasus word count.

Kode diatas merupakan satu-satunya kode yang perlu dipersiapkan oleh klien pengguna framework MapReduce yang telah diimplementasikan sebelumnya.

Pada kasus word count, mapF memisahkan setiap kata yang terdapat dalam contents untuk menghasilkan pasangan key-value dengan key berupa kata tertentu dengan value 1. Fungsi reduceF akan menjumlahkan seluruh value yang dimiliki oleh sebuah key. Sehingga pada akhirnya klien bisa mendapatkan jumlah kemunculan setiap kata yang terdapat pada variabel contents .

MapReduce Master dan Pekerja

Implementasi lengkap dari Master dan Pekerja (worker) dapat dilihat pada github repository mr-demo.

Implementasi Master dan Worker yang digunakan pada tulisan ini merupakan basis kode yang digunakan pada tugas laboratorium mata kuliah Distributed System MIT (6.824). Namun, implementasi scheduler, doMap dan doReduce merupakan implementasi penulis sendiri.

Kesimpulan

Framework MapReduce sederhana terdiri dari Master, mesin pekerja, scheduler, map & reduce controller.

P.S. Jika teman-teman menyukai artikel semacam ini, silakan subscribe ke newsletter kita dan dapatkan notifikasi artikel terbaru langsung di inbox kamu!

--

--

Imre Nagi
Pujangga Teknologi

Google Developer Expert, Cloud Platform Engineer @gojek