ลองใช้งาน Asynchronous Flow แบบเบื้องต้น
Kotlin Flow คือ ไลบรารีสำหรับการทำงานแบบ Asynchronous พัฒนาโดย JetBrains, บริษัทที่อยู่เบื้องหลังการพัฒนาภาษา Kotlin นั่นเอง รูปแบบการทำงานถือได้ว่าคล้ายกับการทำงาน แบบ Reactive Stream ที่เราคุ้นเคยอย่าง RxAndroid & RxJava เป็นอย่างมาก, Jetbrains ได้ปล่อย Flow เข้ามาเป็นส่วนหนึ่งใน API ของ Kotlin Coroutine เวอร์ชั่น 1.2.0 alpha release ให้ Developer ได้เริ่มทดลองใช้งาน และในบทความนี้เราจะมาเรียนรู้การใช้งาน Asynchronous Flow แบบเบื้องต้นกันครับ
Flow APIs คืออะไร
Flow API ใน Kotlin คือ การเขียนโค้ดเพื่อจัดการข้อมูลแบบ Asynchronous ที่มีการทำงานแบบเป็นลำดับ (sequentially) ใน RxJava เราจะคุ้นเคยกับการสร้าง Observables ที่ทำหน้าที่ผลิตชุดข้อมูล และเมื่อเราต้องการค่าที่มีใน Observables มาใช้งาน เราจะต้องทำการ subscribe ก่อน เช่นเดียวกันกับ Flow การทำงานจะมีเงื่อนไขเดียวกัน คือ Flow จะไม่ทำงานจนกว่าจะสั่ง collected นั่นเอง
รู้จัก 3 Step ของ Flow
ทำความรู้จัก 3 step การทำงานของ Flow กันก่อนว่ามีส่วนประกอบอะไรบ้าง และทำหน้าที่อะไร ก่อนที่จะเริ่มเข้าสู่การลงมือเขียนโค้ดครับ
- flow { … } ทำหน้าที่เป็นตัว builder
- emit(value) ทำหน้าที่ส่งข้อมูล (transmit a value)
- collect { … } คือ รับค่าข้อมูลที่ได้จากการ emit (receive the values)
เริ่มต้นทดลองใช้งาน Flow API
เปิดโปรแกรม IntelliJ IDEA CE ขึ้นมา ทำการ new project ด้วย Kotlin ให้เรียบร้อย สามารถดูขั้นตอนเบื้องต้นได้ที่ กดปุ่มนี้ จากนั้นทำการเพิ่ม Dependencies สำหรับ coroutine ใน build.gradle เข้าไป
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.4"
และอย่าลืมเพิ่ม jcenter() เข้าไปใน repositories ด้วย จากนั้นทำการ sync ก็พร้อมสำหรับลงมือโค้ดดิ้งกันแล้ว
repositories {
jcenter()
}
1. การสร้าง Flow
การสร้าง Flow สามารถทำได้หลายรูปแบบ ในบทความนี้จะพาสร้าง Flow ในรูปแบบง่าย ๆ 3 แบบ คือ
1. การสร้าง Flow จากการ emit ค่า
2. การใช้งาน asFlow()
3. การใช้งาน flowOf()
มาดูตัวอย่างการใช้งานการสร้าง Flow ในแต่ละแบบตามโค้ดด้านล่าง
- การ emit ค่า
fun createFlowEmit() = flow<Int> {
for (i in 1..3) {
delay(300L) //delay for 300ms to emit next number
emit(i) //emit next value
}
}
2. asFlow()
fun createFlowFromAsFlow(): Flow<Int> {
return listOf<Int>(1, 2, 3).asFlow()
.onEach {
delay(300L) //delay for 300ms then emit next number
}
.flowOn(Dispatchers.Default) //change context for CPU-consuming
}
3. flowOf()
fun createFlowFromFlowOf(): Flow<Int> {
return flowOf(1, 2, 3)
}
จากนั้นทำการ สร้าง function main() เพื่อทำการรันโค้ด และดูผลลัพธ์
fun main() {
runBlocking { createFlowEmit().collect { //collect each value from flow
println("Received way1 : $it")
} println() createFlowFromAsFlow().collect {
println("Received way2 : $it")
} println() createFlowFromFlowOf().collect {
println("Received way3 : $it")
} }
}
หลังจากนั้นทดสอบรันจะได้ผลลัพธ์ ดังนี้
ข้อสังเกต : flow { … } ทำงานใน background thread ในขณะที่ collection เกิดขึ้นใน main thread
- จะเห็นได้ว่าทั้งสามแบบให้ผลลัพธ์แบบเดียวกัน
- เราสามารถส่งข้อมูลด้วยวิธีการสั่ง emit ค่า
- collect เป็น function สั่งให้เริ่มการทำงานในการนำข้อมูลจาก Flow มาใช้งาน
- สามารถสลับการทำงานของ context ด้วยคำสั่ง flowOn Operator เช่น ให้ทำงานใน IO, Main Thread, Default เป็นต้น
- ข้อมูลแบบ Collection เช่น LinkedList, ArrayList ฯลฯ สามารถแปลงค่าเป็น Flow ได้โดยตรงด้วยคำสั่ง asFlow() เช่น listOf(1, 2, 3).asFlow()
- Flow สามารถสร้างขึ้นจากชนิดข้อมูลใดก็ได้ เช่น flowOf(“string”, 1, true, 3.14)
- ใน dependencies ของ kotlinx-coroutines-android สามารถทำให้เราแปลงข้อมูล ชนิด Flow เป็น LiveData ได้เลย ด้วยคำสั่ง asLiveData()
2. การรับค่าข้อมูลจาก Flow
ตัวอย่างการรับค่าที่สร้างขึ้นจาก Flow มาใช้งาน มีรูปแบบคำสั่งที่สามารถทำได้ ดังตัวอย่างต่อไปนี้
สมมุติว่ามีการสร้าง Flow ที่ทำการผลิตข้อมูลค่าของจำนวนเฉพาะ และมีการดีเลย์ 200ms ก่อนที่จะผลิตข้อมูลถัดไปจนครบ ดังโค้ดนี้
fun sendPrimes(): Flow<Int> = flow {
val primeList = listOf<Int>(2, 3, 5, 7, 11, 13, 17, 19, 23, 29)
primeList.forEach {
delay(200)
emit(it)
}
}
เราสามารถที่จะรับค่าที่ผลิตขึ้นมาจาก Flow ไปใช้งานด้วยคำสั่ง
collect
runBlocking { sendPrimes()
.collect(object : FlowCollector<Number> {
override suspend fun emit(value: Number) {
println("number way1 = $value")
}
})//หรือ sendPrimes()
.collect {
println("number way1 = $it")
}}
collectIndexed
จะเหมือน collect ทุกอย่าง เพียงแต่จะมีตำแหน่ง index ของค่านั้นเพิ่มมาด้วย
runBlocking {
sendPrimes()
.collectIndexed { index, value ->
println("number way2 : index = $index ; value = $value")
}}
LunchIn
จะมีความแตกต่างจาก collect ตรงที่คำสั่งนี้จะรับค่าผ่าน onEach แทน และสามารถกำหนด coroutine scope ที่ต้องการได้ และยัง return ค่าออกมาเป็น job ซึ่งสามารถ ทำการยกเลิกการทำงานของ flow ได้ด้วยคำสั่ง job.cancel()
runBlocking {
val job = sendPrimes()
.onEach {
println("launchIn : $it")
}
.launchIn(this) //<--Launching the flow in a separate coroutine delay(10000L)
job.cancel() }
- ยังมีคำสั่งอีกอัน คือ collectLatest ที่มีการทำงาน คือ หลังจากได้ค่ามาจะทำการ restart ไปเรื่อย ๆ จนกว่าจะได้ค่าสุดท้าย วิธีการใช้งานสามารถดู ได้ที่นี่
การใช้งาน onStart, onCatch, onCompletion
เราสามารถดักจับ event เหตุการ์ณต่าง ๆ ในการทำงานของ Flow ได้ยังไงบ้าง
- onStart จะเป็นเหตุการณ์ที่ Flow กำลังจะเริ่มทำงาน
- onCatch เมื่อมีการ error เกิดขึ้น
- onCompletion เมื่อ Flow ทำงานสำเร็จ
ตัวอย่างวิธีการใช้งาน Flow สำหรับดักจับเหตุการ์ณต่าง ๆ ซึ่งจะเป็นรูปแบบที่ Developer มักจะนำมาใช้ เมื่อมีการโหลดข้อมูลหรือมีกระบวนการบางอย่างที่ต้องใช้เวลา เช่น กำลังรอข้อมูลจาก API ก็ทำการแสดง loading ที่ onStart และเมื่อ Flow ทำงานสำเร็จ ก็ทำการซ่อน loading ที่ onCompletion รวมถึง update UI เมื่อได้รับข้อมูลมา และจัดการแสดงผลเมื่อมี error เกิดขึ้น
runBlocking { sendPrimes()
.onStart {
println("onStart")
}
.catch { error ->
println("onCatch : $error")
}
.onCompletion { error ->
if (error == null) {
println("onCompletion success")
} else {
println("onCompletion error : $error")
}
}
.collect {
println("number = " + it)
}}
สรุป
หลังจากการทดลองใช้งาน Asynchronous Flow แบบเบื้องต้นน่าจะช่วยให้ผู้อ่านได้มองเห็นภาพว่าการจะสร้างหรือผลิต Flow และนำค่าที่ได้ไปใช้งานได้อย่างไร ที่เหลือก็คือการนำไปประยุกต์เพื่อใช้งานในโปรเจกของเรา ข้อดีอีกอย่าง คือ การที่ Flow ให้เราแปลงค่าจาก Collection มาเป็น Flow ได้เลยที่อำนวยความสะดวกมาก ๆ แต่ยังไม่หมด Flow ยังมี operatorsให้ได้ลองใช้งานอีกมากมาย เช่น การ filter, map, transform, take, reduce, onEach, Buffer ฯลฯ รวมถึงยังสามารถทำการ composing flow ได้ เช่น การนำ flow มากกว่า 1 อันมาทำงานพร้อมกันแบบ zip, combine เป็นต้น