ลองใช้งาน Asynchronous Flow แบบเบื้องต้น

Theerapong.Kha
te<h @TDG
Published in
4 min readApr 24, 2020
ที่มา What is Flow in Kotlin and how to use it in Android Project? : https://blog.mindorks.com/what-is-flow-in-kotlin-and-how-to-use-it-in-android-project

Kotlin Flow คือ ไลบรารีสำหรับการทำงานแบบ Asynchronous พัฒนาโดย JetBrains, บริษัทที่อยู่เบื้องหลังการพัฒนาภาษา Kotlin นั่นเอง รูปแบบการทำงานถือได้ว่าคล้ายกับการทำงาน แบบ Reactive Stream ที่เราคุ้นเคยอย่าง RxAndroid & RxJava เป็นอย่างมาก, Jetbrains ได้ปล่อย Flow เข้ามาเป็นส่วนหนึ่งใน API ของ Kotlin Coroutine เวอร์ชั่น 1.2.0 alpha release ให้ Developer ได้เริ่มทดลองใช้งาน และในบทความนี้เราจะมาเรียนรู้การใช้งาน Asynchronous Flow แบบเบื้องต้นกันครับ

Flow APIs คืออะไร

Flow API ใน Kotlin คือ การเขียนโค้ดเพื่อจัดการข้อมูลแบบ Asynchronous ที่มีการทำงานแบบเป็นลำดับ (sequentially) ใน RxJava เราจะคุ้นเคยกับการสร้าง Observables ที่ทำหน้าที่ผลิตชุดข้อมูล และเมื่อเราต้องการค่าที่มีใน Observables มาใช้งาน เราจะต้องทำการ subscribe ก่อน เช่นเดียวกันกับ Flow การทำงานจะมีเงื่อนไขเดียวกัน คือ Flow จะไม่ทำงานจนกว่าจะสั่ง collected นั่นเอง

รู้จัก 3 Step ของ Flow

ทำความรู้จัก 3 step การทำงานของ Flow กันก่อนว่ามีส่วนประกอบอะไรบ้าง และทำหน้าที่อะไร ก่อนที่จะเริ่มเข้าสู่การลงมือเขียนโค้ดครับ

  • flow { … } ทำหน้าที่เป็นตัว builder
  • emit(value) ทำหน้าที่ส่งข้อมูล (transmit a value)
  • collect { … } คือ รับค่าข้อมูลที่ได้จากการ emit (receive the values)

เริ่มต้นทดลองใช้งาน Flow API

เปิดโปรแกรม IntelliJ IDEA CE ขึ้นมา ทำการ new project ด้วย Kotlin ให้เรียบร้อย สามารถดูขั้นตอนเบื้องต้นได้ที่ กดปุ่มนี้ จากนั้นทำการเพิ่ม Dependencies สำหรับ coroutine ใน build.gradle เข้าไป

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.4"

และอย่าลืมเพิ่ม jcenter() เข้าไปใน repositories ด้วย จากนั้นทำการ sync ก็พร้อมสำหรับลงมือโค้ดดิ้งกันแล้ว

repositories {
jcenter()
}

1. การสร้าง Flow

การสร้าง Flow สามารถทำได้หลายรูปแบบ ในบทความนี้จะพาสร้าง Flow ในรูปแบบง่าย ๆ 3 แบบ คือ
1. การสร้าง Flow จากการ emit ค่า
2. การใช้งาน asFlow()
3. การใช้งาน flowOf()
มาดูตัวอย่างการใช้งานการสร้าง Flow ในแต่ละแบบตามโค้ดด้านล่าง

  1. การ emit ค่า
fun createFlowEmit() = flow<Int> {
for (i in 1..3) {
delay(300L) //delay for 300ms to emit next number
emit(i) //emit next value
}
}

2. asFlow()

fun createFlowFromAsFlow(): Flow<Int> {
return listOf<Int>(1, 2, 3).asFlow()
.onEach {
delay(300L) //delay for 300ms then emit next number
}
.flowOn(Dispatchers.Default) //change context for CPU-consuming
}

3. flowOf()

fun createFlowFromFlowOf(): Flow<Int> {
return flowOf(1, 2, 3)
}

จากนั้นทำการ สร้าง function main() เพื่อทำการรันโค้ด และดูผลลัพธ์

fun main() {
runBlocking {
createFlowEmit().collect { //collect each value from flow
println("Received way1 : $it")
}
println() createFlowFromAsFlow().collect {
println("Received way2 : $it")
}
println() createFlowFromFlowOf().collect {
println("Received way3 : $it")
}
}
}

หลังจากนั้นทดสอบรันจะได้ผลลัพธ์ ดังนี้

ผลลัพธ์จากการรันโค้ด

ข้อสังเกต : flow { … } ทำงานใน background thread ในขณะที่ collection เกิดขึ้นใน main thread

  • จะเห็นได้ว่าทั้งสามแบบให้ผลลัพธ์แบบเดียวกัน
  • เราสามารถส่งข้อมูลด้วยวิธีการสั่ง emit ค่า
  • collect เป็น function สั่งให้เริ่มการทำงานในการนำข้อมูลจาก Flow มาใช้งาน
  • สามารถสลับการทำงานของ context ด้วยคำสั่ง flowOn Operator เช่น ให้ทำงานใน IO, Main Thread, Default เป็นต้น
  • ข้อมูลแบบ Collection เช่น LinkedList, ArrayList ฯลฯ สามารถแปลงค่าเป็น Flow ได้โดยตรงด้วยคำสั่ง asFlow() เช่น listOf(1, 2, 3).asFlow()
  • Flow สามารถสร้างขึ้นจากชนิดข้อมูลใดก็ได้ เช่น flowOf(“string”, 1, true, 3.14)
  • ใน dependencies ของ kotlinx-coroutines-android สามารถทำให้เราแปลงข้อมูล ชนิด Flow เป็น LiveData ได้เลย ด้วยคำสั่ง asLiveData()

2. การรับค่าข้อมูลจาก Flow

ตัวอย่างการรับค่าที่สร้างขึ้นจาก Flow มาใช้งาน มีรูปแบบคำสั่งที่สามารถทำได้ ดังตัวอย่างต่อไปนี้

สมมุติว่ามีการสร้าง Flow ที่ทำการผลิตข้อมูลค่าของจำนวนเฉพาะ และมีการดีเลย์ 200ms ก่อนที่จะผลิตข้อมูลถัดไปจนครบ ดังโค้ดนี้

fun sendPrimes(): Flow<Int> = flow {
val primeList = listOf<Int>(2, 3, 5, 7, 11, 13, 17, 19, 23, 29)
primeList.forEach {
delay(200)
emit(it)
}
}

เราสามารถที่จะรับค่าที่ผลิตขึ้นมาจาก Flow ไปใช้งานด้วยคำสั่ง

collect

runBlocking {  sendPrimes()
.collect(object : FlowCollector<Number> {
override suspend fun emit(value: Number) {
println("number way1 = $value")
}
})
//หรือ sendPrimes()
.collect {
println("number way1 = $it")
}
}
ผลลัพธ์การรับค่าจาก collect

collectIndexed

จะเหมือน collect ทุกอย่าง เพียงแต่จะมีตำแหน่ง index ของค่านั้นเพิ่มมาด้วย

runBlocking {

sendPrimes()
.collectIndexed { index, value ->
println("number way2 : index = $index ; value = $value")
}
}
ผลลัพธ์จากการรันด้วย collectIndexed

LunchIn

จะมีความแตกต่างจาก collect ตรงที่คำสั่งนี้จะรับค่าผ่าน onEach แทน และสามารถกำหนด coroutine scope ที่ต้องการได้ และยัง return ค่าออกมาเป็น job ซึ่งสามารถ ทำการยกเลิกการทำงานของ flow ได้ด้วยคำสั่ง job.cancel()

runBlocking {

val job =
sendPrimes()
.onEach {
println("launchIn : $it")
}
.launchIn(this) //<--Launching the flow in a separate coroutine
delay(10000L)
job.cancel()
}
ผลลัพธ์จากการรันด้วย LaunchIn
  • ยังมีคำสั่งอีกอัน คือ collectLatest ที่มีการทำงาน คือ หลังจากได้ค่ามาจะทำการ restart ไปเรื่อย ๆ จนกว่าจะได้ค่าสุดท้าย วิธีการใช้งานสามารถดู ได้ที่นี่

การใช้งาน onStart, onCatch, onCompletion

เราสามารถดักจับ event เหตุการ์ณต่าง ๆ ในการทำงานของ Flow ได้ยังไงบ้าง

  • onStart จะเป็นเหตุการณ์ที่ Flow กำลังจะเริ่มทำงาน
  • onCatch เมื่อมีการ error เกิดขึ้น
  • onCompletion เมื่อ Flow ทำงานสำเร็จ

ตัวอย่างวิธีการใช้งาน Flow สำหรับดักจับเหตุการ์ณต่าง ๆ ซึ่งจะเป็นรูปแบบที่ Developer มักจะนำมาใช้ เมื่อมีการโหลดข้อมูลหรือมีกระบวนการบางอย่างที่ต้องใช้เวลา เช่น กำลังรอข้อมูลจาก API ก็ทำการแสดง loading ที่ onStart และเมื่อ Flow ทำงานสำเร็จ ก็ทำการซ่อน loading ที่ onCompletion รวมถึง update UI เมื่อได้รับข้อมูลมา และจัดการแสดงผลเมื่อมี error เกิดขึ้น

runBlocking {  sendPrimes()
.onStart {
println("onStart")
}
.catch { error ->
println("onCatch : $error")
}
.onCompletion { error ->
if (error == null) {
println("onCompletion success")
} else {
println("onCompletion error : $error")
}
}
.collect {
println("number = " + it)
}
}
ผลลัพธ์จากการรันด้วย onStart, onCache, onCompletion

สรุป

หลังจากการทดลองใช้งาน Asynchronous Flow แบบเบื้องต้นน่าจะช่วยให้ผู้อ่านได้มองเห็นภาพว่าการจะสร้างหรือผลิต Flow และนำค่าที่ได้ไปใช้งานได้อย่างไร ที่เหลือก็คือการนำไปประยุกต์เพื่อใช้งานในโปรเจกของเรา ข้อดีอีกอย่าง คือ การที่ Flow ให้เราแปลงค่าจาก Collection มาเป็น Flow ได้เลยที่อำนวยความสะดวกมาก ๆ แต่ยังไม่หมด Flow ยังมี operatorsให้ได้ลองใช้งานอีกมากมาย เช่น การ filter, map, transform, take, reduce, onEach, Buffer ฯลฯ รวมถึงยังสามารถทำการ composing flow ได้ เช่น การนำ flow มากกว่า 1 อันมาทำงานพร้อมกันแบบ zip, combine เป็นต้น

ตัวอย่าง Source Code

--

--