เริ่มต้น Reactive Programming ด้วย Combine Part4: Publisher, Subscriber, Subject and Cancellable

Cocodev
Lotus’s IT
Published in
4 min readDec 18, 2023
UML diagram:

Publisher กับ Subscriber ใน Combine เปรียบคือ Observable และ Observer ใน Reactive programming ใน framework อื่นๆ

Publisher

คือประกาศหรือผู้แถลง สามารถปล่อยค่าออกมาได้แบบตลอดเวลา และหยุดการทำงานจนกว่าจะมี error หรือ ไม่มี subscriber ส่วนสิ่งที่มันประกาศออกมันก็คือ stream นั้นเอง

Built-in Publishers

Just - เป็นสร้าง publisher ด้วยการ emit ค่าเพียงแค่ครั้งเดียวและจบการทำงาน

let publisher = Just("Hi Just")
publisher.sink { value in
print(value)
}

// Output
Hi Just

Future - เป็นสร้าง publisher ด้วย promise และ emit ค่าเพียงแค่ครั้งเดียวแล้วจบการทำงาน

let publisher = Future<String, Never> { promise in
let val = "Hi Future"
promise(.success(val))
}
publisher.sink { value in
print(value)
}
// Output
Hi Future

Sequence - เป็นเปลี่ยนให้ list เป็น Sequence ของ publisher

let publisher = ["Combine", "SwiftUI", "Machine Learning"].publisher
publisher.sink { _ in
print("done")
} receiveValue: { value in
print(value)
}
// Output
Combine
SwiftUI
Machine Learning
done

Fail - สร้าง publisher ประเภท Error โดยมันเกิดการ emit เพียงครั้งเดียว

let devTechiePublisher = Fail<String, Error>(error: NSError(domain: "com.lotus", code: 400, userInfo: nil))
devTechiePublisher .sink { completion in
switch completion {
case .finished:
print("finished")
case .failure(let err as NSError):
print(err.localizedDescription, "Reason: \(err.userInfo)")
}
} receiveValue: { value in
print("Value")
}
// Output
The operation couldn’t be completed. (com.lotus error 400.) Reason: [:]

และยังมีวิธีการสร้างที่ไม่ได้ยกตัวอย่างขึ้นอีกเช่น Record, Share, Multicast, Deferred และ Empty

Subscriber

คือผู้รับสารจาก Publisher หรือ รับ stream จาก publisher ถ้า publisher ไม่มี subscriber ก็จากไม่เกิดการทำงานใดๆ นั้นหมายความว่า การทำงานของที่สมบูรณ์ จะต้องประกอบไปด้วย publisher และ subscriber

built-in subscribers

Sink - เป็น closure สำหรับรับค่าที่ emit มาจาก publisher เป็นวิธีที่เรียบง่ายของการส่งค่าจาก subscriber และ sink คืนค่าเป็น cancellable

enum MyError: Error {
case someError
}

let publisher = PassthroughSubject<String, MyError>()
let cancellable = publisher.sink { complate in
print(complate)
} receiveValue: { value in
print(value)
}

publisher.send("Banana")
publisher.send(completion: .failure(MyError.someError))

cancellable.cancel()

// Output
Data
failure(__lldb_expr_53.MyError.someError)

Assign - เป็นการ assigns ค่าจาก publisher สู่ property ของ object ที่เราต้องการ

class ShowRoom {
var car: String = ""
}

let showRoom = ShowRoom()
let coursePublisher = PassthroughSubject<String, Never>()
let cancellable = coursePublisher.assign(to: \.car, on: showRoom)

coursePublisher.send("Honda")
print(showRoom.car)

coursePublisher.send("BMW")
print(showRoom.car)

cancellable.cancel()

// Output
Honda
BMW

Cancellable

เมื่อ subscriber ทำงานเสร็จ หรือ ไม่จากรับข้อมูลจาก publisher อีกต่อไป จะเป็นการดีกว่าถ้าเราให้ยกเลิกการ subscription ต่างๆ เพื่อคืน memory และ อยุดการทำงานที่ไม่ต้องการเช่น network calls

class MyFetcher {
var cancellable = Set<AnyCancellable>()
var googleCancel: AnyCancellable?

deinit {
googleCancel?.cancel()
print("deinit")
}

func fetchYoutube() {
let publisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://youtube.com")!)

publisher.sink(receiveCompletion: { _ in }) { output in
print(output)
}.store(in: &cancellable)
}

func fetchGoogle() {
let publisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://www.google.com/")!)

googleCancel = publisher.sink(receiveCompletion: { _ in }) { output in
print(output)
}
}
}

จากตัวอย่าง การยกเลิกการ subscription ทำได้โดยใช้ subscription.cancel() โดยเรียกใช้ในตำแหน่งที่ต้องการให้ยกเลิก หรือ ใช้ cancellable แล้ว .store(in: &cancellable) ท่านี้เป็นที่นิยมเพราะเนื่องจาก subscription จะยกเลิกอัตโนมัติเมื่อ object ที่ถือ subscription อยู่ทำการ deinit

Subjects

Subject สามารถเป็นได้ทั้ง publisher และ subject ได้ในเวลาเดียวกัน มันถูกใช้เพื่อเป็นสะพานระหว่างแนวคิดของการเขียนคำสั่งแบบ imperative และ reactive programming ทำให้เราสามารถสร้าง object ที่สามารถรับและส่งค่าไปยังหลายๆ subscriber และตัว subject มี 2 type หลักได้แก้ PassthroughSubject และ CurrentValueSubject

PassthroughSubject

passthroughSubject เป็น subject ที่ตรงไปตรงมาที่สุด หน้าที่มันเพียงแค่ส่งผ่านค่าที่ได้รับไปยัง subscriber ของมัน และ เมื่อมี subscriber ใหม่ถูกเพิ่มเข้าใน passthroughSubject เริ่มรับค่าตั้งแต่จุดนั้นเป็นต้นไป passthroughSubject จะไม่มีการเก็บค่าใดๆ ดังนั้น subscriber ที่เพิ่มหลังจากค่าถูกส่งไปแล้วจะไม่ได้รับค่านั้นๆ

let subject = PassthroughSubject<String, Never>()

let subscription = subject.sink { value in
print("Received value on sub1: \(value)")
}

subject.send("1")
subject.send("2")
subject.send("3")

let subscription2 = subject.sink { value in
print("Received value on sub2: \(value)")
}

subject.send("4")

// Output
Received value on sub1: 1
Received value on sub1: 2
Received value on sub1: 3
Received value on sub2: 4
Received value on sub1: 4

CurrentValueSubject

CurrentValueSubject จะมีความคล้ายกับ PassthroughSubject แต่ว่ามันจะเก็บค่าล่าสุดที่มีการส่งค่าผ่านตัวมัน และเมื่อมี subscriber ใหม่เพิ่มเข้ามาที่ตัวมัน subscriber ใหม่นี้จะได้รับค่าล่าสุดในทันที

let subject = CurrentValueSubject<String, Never>("Start")

let subscription = subject.sink { value in
print("Sub 1: \(value)")
}

subject.send("Run")
subject.send("Sleep")

let subscription2 = subject.sink { value in
print("Sub 2: \(value)")
}

subject.send("End")

// Output
Sub 1: Start
Sub 1: Run
Sub 1: Sleep
Sub 2: Sleep
Sub 1: End
Sub 2: End

Real World

สมมุติว่าเรามีแอปพลิเคชันที่ให้บริการข้อมูลสภาพอากาศแบบ real-time

Publisher (ผู้เผยแพร่): กรมอุตุนิยมวิทยา เป็น Publisher ทุกครั้งที่มีข้อมูลเกี่ยวกับสภาพอากาศในพื้นที่ต่าง ๆ ของประเทศ เช่น ข้อมูลการพยากรณ์อากาศ แจ้งเตือนภัยพิบัติทางอากาศ หรือสถานการณ์ทางอากาศที่สำคัญ กรมนี้จะเป็นผู้ส่งข้อมูลไปยังทุก Subscriber ที่สนใจเรื่องสภาพอากาศในพื้นที่นั้น ๆ

Subscriber (ผู้รับ): นักเดินทาง A เป็น Subscriber โดยเขาใช้แอปพลิเคชันเพื่อติดตามข้อมูลสภาพอากาศในพื้นที่ที่เขากำลังเดินทาง ทุกครั้งที่มีข้อมูลเกี่ยวกับสภาพอากาศที่มีผลต่อการเดินทาง เช่น ฝนตกหนัก หรือสถานการณ์ภัยพิบัติทางอากาศ เขาจะได้รับการแจ้งเตือนทันทีทางแอปพลิเคชัน

ดังนั้น เหตุการณ์นี้แสดงถึงการทำงาน Publisher-Subscribe ที่ Publisher เป็นกรมอุตุนิยมวิทยาที่เผยแพร่ข้อมูลสภาพอากาศ และ Subscriber เป็นนักเดินทางที่รับข้อมูลแจ้งเตือนเมื่อมีข้อมูลสภาพอากาศที่สำคัญในพื้นที่ที่เขาสนใจ

Code Examples

มาดูกันว่าจากตัวอย่างในโลกความเป็นจริง เราสามารถนำมาแก้ปัญหานี้ด้วย Publisher และ Subscriber ได้อย่างไร

// Step 1
struct WeatherData {
let location: String
let temperature: Double
let conditions: String
}

// Step 2
// Create a publisher and stream of WeatherData
let weatherPublisher = PassthroughSubject<WeatherData, Never>()

// Step 3
// Create cancellable
var cancellable = Set<AnyCancellable>()

// Step 4
// Create a subscriber that prints each data from stream
let userASubscriber = Subscribers.Sink<WeatherData, Never> { value in
print("complate: \(value)")
} receiveValue: { value in
print("\(value.location), \(value.temperature) \(value.conditions)")
}.store(in: &cancellable)

// Step 5
// Subscribe the subscriber to the publisher
weatherPublisher.subscribe(userASubscriber)

//Tests Weater
//Time: 09:00
weatherPublisher.send(WeatherData(location: "Bangkok", temperature: 32.5, conditions: "Clear sky"))
//Time: 12:00
weatherPublisher.send(WeatherData(location: "Chiang Mai", temperature: 28.0, conditions: "Partly cloudy"))
//Time: 15:00
weatherPublisher.send(WeatherData(location: "Phuket", temperature: 30.2, conditions: "Scattered showers"))

ระบบ Weather Implement อย่างไร

  1. ขั้นตอนแรกออกแบบ data structure ชื่อ WeatherData เป็น model โดยมี 3 properties ได้แก่ location , temperature และ conditions
  2. สร้าง publisher ให้ระบบ Weather เผื่อเป็นตัวประกาศข้อมูล และข้อมูลที่ประกาศเป็น stream ของ WeatherDat data type
  3. สร้าง cancellable สำหรับ handle การหยุดทำงานของ publisher
  4. ให้นักเดินทาง A เป็น subscriber ให้สามารถรับข้อมูลจากการประกาศได้ และ ให้ cancellable assgin ค่าไปใน store(in:_)
  5. ให้นักเดินทาง A ซึ่งเป็น subscriber แล้วทำการ subscribe กับ publisher ของระบบ Weather

ทดสอบระบบกัน

  1. เวลา 9:00 น รายงานสภาพอากาศ ให้นักท่องเที่ยว A ตอนอยู่ที่ Bangkok
  2. เวลา 12:00 น รายงานสภาพอากาศ ให้นักท่องเที่ยว A ตอนอยู่ที่ Chiang Mai
  3. เวลา 15:00 น รายงานสภาพอากาศ ให้นักท่องเที่ยว A ตอนอยู่ที่ Phuket
// Output
//Time: 09:00
Bangkok, 32.5 Clear sky
//Time: 12:00
Chiang Mai, 28.0 Partly cloudy
//Time: 15:00
Phuket, 30.2 Scattered showers

Summay

มาถึงตรงนี้เราใด้มาสำรวจถึง Publisher และ Subscriber ในเรื่อง ความหมาย, หลักการทำงาน และ code ที่อิงมาจากตัวอย่างในโลกของความเป็นจริง ความตั้งใจคืออยากให้มองเห็นภาพได้ง่ายมาขึ้น

Next part5: Reactive Programming ด้วย Combine Part5: Combining Operator

https://medium.com/@ittipongkeawmahing/a1d268ccb20e

--

--