All-In-One Guide to iOS Combine: Journey from Basics to Advanced Implementation part2
The following will be covered:
- Creating Custom Publishers and Subscribers in iOS Combine
- Creating Custom Operators in iOS Combine
- Testing Combine Publishers
- Understanding Backpressure in Combine
- Advanced Error Handling in Combine: Complex Retry Logic
Creating Custom Publishers and Subscribers in iOS Combine
While Combine provides a rich set of publishers and subscribers, there are times when you need a custom solution tailored to your specific requirements. Let’s explore how to create custom publishers and subscribers to handle more advanced use cases.
Custom Publishers
A custom publisher allows you to define a unique way to emit values. To create a custom publisher, you’ll need to:
- Conform to the
Publisher
protocol. - Specify the
Output
andFailure
associated types. - Implement the
receive(subscriber:)
method.
Example: A Custom Publisher that Emits Fibonacci Numbers
Step 1: Implement the Publisher
import Combine
struct FibonacciPublisher: Publisher {
typealias Output = Int
typealias Failure = Never
let count: Int
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = FibonacciSubscription(subscriber: subscriber, count: count)
subscriber.receive(subscription: subscription)
}
}
Step 2: Create a Subscription
Each subscriber requires a subscription object. We’ll implement a subscription that emits Fibonacci numbers.
private class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {
private var subscriber: S?
private let count: Int
private var currentIndex = 0
private var fibonacciSequence: [Int] = []
init(subscriber: S, count: Int) {
self.subscriber = subscriber
self.count = count
generateFibonacciSequence()
}
func request(_ demand: Subscribers.Demand) {
var remainingDemand = demand
while remainingDemand > 0 && currentIndex < fibonacciSequence.count {
_ = subscriber?.receive(fibonacciSequence[currentIndex])
currentIndex += 1
remainingDemand -= 1
}
if currentIndex >= fibonacciSequence.count {
subscriber?.receive(completion: .finished)
}
}
func cancel() {
subscriber = nil
}
private func generateFibonacciSequence() {
fibonacciSequence = [0, 1]
for i in 2..<count {
fibonacciSequence.append(fibonacciSequence[i - 1] + fibonacciSequence[i - 2])
}
fibonacciSequence = Array(fibonacciSequence.prefix(count))
}
}
Usage Example:
import Combine
let fibonacciPublisher = FibonacciPublisher(count: 10)
let subscription = fibonacciPublisher.sink { value in
print("Fibonacci value: \(value)")
}
Custom Subscribers
A custom subscriber processes the emitted values from a publisher. To create a custom subscriber, you’ll need to:
- Conform to the
Subscriber
protocol. - Specify the
Input
andFailure
associated types. - Implement the
receive(subscription:)
,receive(_:)
, andreceive(completion:)
methods.
Example: A Custom Subscriber that Collects Values
Step 1: Implement the Subscriber
import Combine
class CollectingSubscriber<Input, Failure: Error>: Subscriber {
typealias Input = Input
typealias Failure = Failure
private(set) var collectedValues: [Input] = []
private(set) var completion: Subscribers.Completion<Failure>?
private var subscription: Subscription?
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited) // Request unlimited values
}
func receive(_ input: Input) -> Subscribers.Demand {
collectedValues.append(input)
return .unlimited
}
func receive(completion: Subscribers.Completion<Failure>) {
self.completion = completion
}
func cancel() {
subscription?.cancel()
subscription = nil
}
}
Usage Example:
let numbersPublisher = [1, 2, 3, 4, 5].publisher
let collectingSubscriber = CollectingSubscriber<Int, Never>()
numbersPublisher.subscribe(collectingSubscriber)
print("Collected values: \(collectingSubscriber.collectedValues)")
Output:
Collected values: [1, 2, 3, 4, 5]
Creating custom publishers and subscribers in Combine allows you to extend the framework’s capabilities and tailor the data flow to meet your unique requirements. With this flexibility, you can implement complex logic that integrates seamlessly with the rest of your Combine pipelines.
Creating Custom Operators in iOS Combine
Operators in Combine provide powerful ways to manipulate data streams. While the framework provides many built-in operators, you may encounter scenarios that require custom transformations or filtering. Let’s look at how to build custom operators.
Creating a Custom Operator
- Define an Extension:
- Add an extension to the
Publisher
protocol. - The new operator function should return an
AnyPublisher
.
- Use the
map
,flatMap
, etc., or implement your own custom transformation:
- Inside the custom operator, apply desired transformations using existing operators or by building custom logic.
- Type-Erase the Publisher:
- Use
eraseToAnyPublisher()
to return anAnyPublisher
.
Example: Custom negate
Operator
A custom operator that negates boolean values.
Step 1: Define the Operator Extension
import Combine
extension Publisher where Output == Bool, Failure == Never {
func negate() -> AnyPublisher<Bool, Never> {
self.map { !$0 }
.eraseToAnyPublisher()
}
}
Usage Example:
let booleanPublisher = Just(true)
let subscription = booleanPublisher
.negate()
.sink { value in
print("Negated value: \(value)")
}
Output:
Negated value: false
Example: Custom replaceNilWithDefault
Operator
A custom operator that replaces nil
values with a default.
Step 1: Define the Operator Extension
import Combine
extension Publisher where Output == String? {
func replaceNilWithDefault(_ defaultValue: String) -> AnyPublisher<String, Never> {
self.map { $0 ?? defaultValue }
.eraseToAnyPublisher()
}
}
Usage Example:
let optionalStrings: [String?] = ["Hello", nil, "World", nil, "Combine"]
let publisher = optionalStrings.publisher
let subscription = publisher
.replaceNilWithDefault("Default")
.sink { value in
print("Received: \(value)")
}
Output:
Received: Hello
Received: Default
Received: World
Received: Default
Received: Combine
Testing Combine Publishers
Testing Combine publishers is essential to ensure your data streams behave as expected. This involves verifying both data output and error handling using the XCTest
framework. Here's a guide to help you effectively test your Combine pipelines.
Key Considerations for Testing Combine Publishers
- Completion Handling:
- Verify successful or failed completions.
- Value Emission:
- Check that the emitted values are correct.
- Timing & Scheduling:
- Test how values are emitted over time or across threads.
Tools and Frameworks for Testing Combine
- XCTest: Apple’s standard testing framework.
- Combine Expectations: A helper library for writing tests with Combine.
- Combine TestSubscribers: Collect emitted values and completions for validation.
Writing Tests with XCTest
- Test Successful Completion:
- Check if a publisher emits all values successfully and completes.
import XCTest
import Combine
class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []
func testJustPublisher() {
let expectedValue = "Hello, Combine!"
let publisher = Just(expectedValue)
let expectation = XCTestExpectation(description: "Publisher emits value and finishes successfully")
publisher
.sink(receiveCompletion: { completion in
if case .finished = completion {
expectation.fulfill()
} else {
XCTFail("Expected to finish successfully")
}
}, receiveValue: { value in
XCTAssertEqual(value, expectedValue)
})
.store(in: &cancellables)
wait(for: [expectation], timeout: 1.0)
}
}
- Test Emitted Values:
- Validate if the publisher emits a specific sequence of values.
import XCTest
import Combine
class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []
func testArrayPublisher() {
let values = [1, 2, 3, 4, 5]
let publisher = values.publisher
let expectation = XCTestExpectation(description: "Publisher emits all values")
var receivedValues: [Int] = []
publisher
.sink(receiveCompletion: { completion in
if case .finished = completion {
XCTAssertEqual(receivedValues, values)
expectation.fulfill()
} else {
XCTFail("Expected to finish successfully")
}
}, receiveValue: { value in
receivedValues.append(value)
})
.store(in: &cancellables)
wait(for: [expectation], timeout: 1.0)
}
}
- Test Failure Handling:
- Check if the publisher correctly handles errors.
import XCTest
import Combine
class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []
func testFailPublisher() {
enum TestError: Error {
case exampleError
}
let publisher = Fail<Int, TestError>(error: .exampleError)
let expectation = XCTestExpectation(description: "Publisher emits an error")
publisher
.sink(receiveCompletion: { completion in
if case .failure(let error) = completion {
XCTAssertEqual(error, TestError.exampleError)
expectation.fulfill()
} else {
XCTFail("Expected to fail with error")
}
}, receiveValue: { _ in
XCTFail("Expected no value")
})
.store(in: &cancellables)
wait(for: [expectation], timeout: 1.0)
}
}
- Test Timing-Based Publishers:
- Verify behavior of timing-based publishers like
Timer
andDebounce
.
import XCTest
import Combine
class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []
func testTimerPublisher() {
let expectation = XCTestExpectation(description: "Timer emits 5 values")
let expectedCount = 5
let timerPublisher = Timer.publish(every: 0.1, on: .main, in: .common).autoconnect()
var receivedCount = 0
timerPublisher
.prefix(expectedCount) // Limit to 5 emissions
.sink(receiveCompletion: { _ in
XCTAssertEqual(receivedCount, expectedCount)
expectation.fulfill()
}, receiveValue: { _ in
receivedCount += 1
})
.store(in: &cancellables)
wait(for: [expectation], timeout: 1.0)
}
func testDebouncePublisher() {
let expectation = XCTestExpectation(description: "Debounce emits only the last value")
let values = ["A", "B", "C"]
let publisher = PassthroughSubject<String, Never>()
var receivedValues: [String] = []
publisher
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink(receiveCompletion: { _ in
XCTAssertEqual(receivedValues, ["C"])
expectation.fulfill()
}, receiveValue: { value in
receivedValues.append(value)
})
.store(in: &cancellables)
// Send values rapidly
publisher.send("A")
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { publisher.send("B") }
DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) { publisher.send("C") }
DispatchQueue.main.asyncAfter(deadline: .now() + 0.6) { publisher.send(completion: .finished) }
wait(for: [expectation], timeout: 2.0)
}
}
- Testing Complex Pipelines:
- Combine multiple operators and verify the final output.
import XCTest
import Combine
class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []
func testComplexPipeline() {
let expectation = XCTestExpectation(description: "Pipeline emits correct results")
let values = [1, 2, 3, 4, 5]
let publisher = values.publisher
let expectedResult = [1, 4, 9, 16, 25]
var receivedValues: [Int] = []
publisher
.filter { $0 % 2 != 0 }
.map { $0 * $0 }
.collect()
.sink(receiveCompletion: { _ in
XCTAssertEqual(receivedValues, expectedResult)
expectation.fulfill()
}, receiveValue: { values in
receivedValues = values
})
.store(in: &cancellables)
wait(for: [expectation], timeout: 1.0)
}
}
Backpressure Management in iOS Combine
Backpressure management is crucial in reactive programming to handle the flow of data when publishers produce values faster than subscribers can consume them. Without proper management, this can lead to high memory usage or even application crashes.
Understanding Backpressure in Combine
- Demand Mechanism:
- Combine uses a demand mechanism to control how many values a subscriber wants from the publisher.
- Subscribers request values from the publisher via the
request(_:)
method.
2. Subscribers.Demand:
- Represents the number of values requested by a subscriber.
- The
Subscribers.Demand
type provides three modes: unlimited
: Requests all available values without limit.max(_:)
: Requests a specific number of values.none
: Requests no values.
3. Demand Management:
- A subscriber can also adjust its demand during the data processing pipeline by returning a new
Subscribers.Demand
from thereceive(_:)
method.
Backpressure Management with Custom Subscribers
When creating custom subscribers, it’s essential to manage the demand to avoid overloading the subscriber.
Example: Creating a Custom Subscriber with Demand Management
import Combine
class DemandManagingSubscriber<Input, Failure: Error>: Subscriber {
private var subscription: Subscription?
private let maxDemand: Int
private var receivedCount = 0
init(maxDemand: Int) {
self.maxDemand = maxDemand
}
func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.max(maxDemand))
}
func receive(_ input: Input) -> Subscribers.Demand {
receivedCount += 1
print("Received value: \(input)")
if receivedCount >= maxDemand {
subscription?.cancel()
return .none
}
return .max(1)
}
func receive(completion: Subscribers.Completion<Failure>) {
print("Received completion: \(completion)")
}
}
let numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher
let customSubscriber = DemandManagingSubscriber<Int, Never>(maxDemand: 5)
numbers.subscribe(customSubscriber)
Backpressure with Built-in Subscribers
Collect
Operator:
- The
collect
operator gathers a specific number of values before passing them downstream. - This is useful for managing backpressure by controlling how frequently data is emitted.
Example: Using collect
to Reduce Emission Frequency
import Combine
let values = (1...10).publisher
let subscription = values
.collect(3) // Collect values in batches of 3
.sink { batch in
print("Batch: \(batch)")
}
Output:
Batch: [1, 2, 3]
Batch: [4, 5, 6]
Batch: [7, 8, 9]
Batch: [10]
Buffer
Operator:
- The
buffer
operator maintains a buffer of emitted values up to a specified size. - This allows subscribers to request data at their own pace.
Example: Using buffer
to Handle Burst Emissions
import Combine
let values = (1...20).publisher
let subscription = values
.buffer(size: 5, prefetch: .keepFull, whenFull: .dropOldest)
.sink { value in
print("Received value: \(value)")
}
Output:
Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received value: 5
Throttle
Operator:
- The
throttle
operator limits how often values are emitted by setting a minimum interval between emissions. - Useful for controlling the flow of rapidly emitted values.
Example: Using throttle
to Limit Emission Frequency
import Combine
import Foundation
let subject = PassthroughSubject<String, Never>()
let subscription = subject
.throttle(for: .seconds(1.0), scheduler: RunLoop.main, latest: true)
.sink { value in
print("Throttled value: \(value)")
}
// Send values quickly
subject.send("A")
DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) { subject.send("B") }
DispatchQueue.main.asyncAfter(deadline: .now() + 0.4) { subject.send("C") }
DispatchQueue.main.asyncAfter(deadline: .now() + 1.2) { subject.send("D") }
DispatchQueue.main.asyncAfter(deadline: .now() + 1.4) { subject.send("E") }
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) { subject.send(completion: .finished) }
Output:
Throttled value: A
Throttled value: C
Throttled value: E
Concurrency in Combine
Concurrency in Combine deals with managing asynchronous tasks, ensuring that publishers and subscribers operate efficiently and safely across different threads. This is essential for maintaining UI responsiveness and offloading heavy work to background threads.
Key Concepts
Schedulers:
- Schedulers represent the context or thread on which a publisher emits values to subscribers.
- Combine provides several built-in schedulers to handle threading:
- Main: The main thread, often used for UI updates.
- Background (Global DispatchQueue): For computationally expensive tasks.
- RunLoop: An event-driven loop.
- OperationQueue: A high-level abstraction for task queues.
receive(on:)
vs subscribe(on:)
:
receive(on:)
changes the thread where the subscriber processes values.subscribe(on:)
changes the thread where the publisher does its initial setup.
Examples of Concurrency Handling
- Using
receive(on:)
to Change Processing Thread:
import Combine
import Foundation
let values = [1, 2, 3, 4, 5].publisher
let subscription = values
.map { value -> Int in
print("Mapping value \(value) on thread: \(Thread.current)")
return value * 2
}
.receive(on: DispatchQueue.main) // Switch to main thread for UI processing
.sink { value in
print("Received value \(value) on thread: \(Thread.current)")
}
Output:
Mapping value 1 on thread: <background thread>
Mapping value 2 on thread: <background thread>
Mapping value 3 on thread: <background thread>
Mapping value 4 on thread: <background thread>
Mapping value 5 on thread: <background thread>
Received value 2 on thread: <main thread>
Received value 4 on thread: <main thread>
Received value 6 on thread: <main thread>
Received value 8 on thread: <main thread>
Received value 10 on thread: <main thread>
2. Using subscribe(on:)
to Change Subscription Thread:
import Combine
import Foundation
let subject = PassthroughSubject<Int, Never>()
let subscription = subject
.subscribe(on: DispatchQueue.global()) // Publisher starts on background thread
.map { value -> Int in
print("Mapping value \(value) on thread: \(Thread.current)")
return value * 2
}
.sink { value in
print("Received value \(value) on thread: \(Thread.current)")
}
// Simulate a burst of values
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .finished)
Output:
Mapping value 1 on thread: <background thread>
Mapping value 2 on thread: <background thread>
Mapping value 3 on thread: <background thread>
Received value 2 on thread: <background thread>
Received value 4 on thread: <background thread>
Received value 6 on thread: <background thread>
3. Combining receive(on:)
and subscribe(on:)
:
You can use both receive(on:)
and subscribe(on:)
in tandem to carefully control which threads handle the publisher setup and value reception.
import Combine
import Foundation
let values = [1, 2, 3, 4, 5].publisher
let subscription = values
.subscribe(on: DispatchQueue.global()) // Publisher starts on a background thread
.map { value -> Int in
print("Mapping value \(value) on thread: \(Thread.current)")
return value * 2
}
.receive(on: DispatchQueue.main) // Values received on the main thread
.sink { value in
print("Received value \(value) on thread: \(Thread.current)")
}
Output:
Mapping value 1 on thread: <background thread>
Mapping value 2 on thread: <background thread>
Mapping value 3 on thread: <background thread>
Mapping value 4 on thread: <background thread>
Mapping value 5 on thread: <background thread>
Received value 2 on thread: <main thread>
Received value 4 on thread: <main thread>
Received value 6 on thread: <main thread>
Received value 8 on thread: <main thread>
Received value 10 on thread: <main thread>
Concurrency in Combine is a powerful tool that enables efficient data processing and responsive user interfaces. By using schedulers and controlling the subscription and reception threads, you can ensure your Combine pipelines are well-optimized for various processing contexts.
Advanced Error Handling in Combine: Complex Retry Logic
In Combine, handling errors effectively is crucial to building resilient applications. While the retry
operator provides basic retry functionality, more complex retry logic might be required for handling different error scenarios or implementing exponential backoff.
Key Concepts for Complex Retry Logic
Retry Operator:
- Retries a failed publisher up to a specified number of times before emitting an error.
catch
Operator:
- Catches errors and replaces them with a fallback publisher.
Custom Retry Logic:
- Using the
flatMap
operator to implement more advanced retry logic, like exponential backoff.
Implementing Complex Retry Logic
- Basic Retry with
retry
:
- Retries a failed network request up to 3 times before failing.
import Combine
enum NetworkError: Error {
case serverError
}
func fetchData() -> AnyPublisher<String, NetworkError> {
return Fail<String, NetworkError>(error: .serverError)
.eraseToAnyPublisher()
}
let subscription = fetchData()
.retry(3)
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Request succeeded")
case .failure(let error):
print("Request failed with error: \(error)")
}
}, receiveValue: { data in
print("Received data: \(data)")
})
Output:
Request failed with error: serverError
2. Custom Retry Logic with Specific Error Handling:
Use a catch
operator to retry based on specific error types and handle retries accordingly.
Example: Retry on Network Errors Only
import Combine
import Foundation
enum NetworkError: Error {
case serverError
case notConnected
case unauthorized
}
func fetchResource() -> AnyPublisher<String, NetworkError> {
return Future<String, NetworkError> { promise in
let error: NetworkError? = Bool.random() ? .serverError : nil
if let error = error {
promise(.failure(error))
} else {
promise(.success("Resource fetched successfully"))
}
}
.eraseToAnyPublisher()
}
extension Publisher where Failure == NetworkError {
func retryOnNetworkErrors(retries: Int) -> AnyPublisher<Output, Failure> {
self.catch { error in
if retries > 0 && (error == .serverError || error == .notConnected) {
return Just(())
.delay(for: .seconds(2), scheduler: DispatchQueue.main)
.setFailureType(to: Failure.self)
.flatMap { _ in
self.retryOnNetworkErrors(retries: retries - 1)
}
.eraseToAnyPublisher()
} else {
return Fail(outputType: Output.self, failure: error).eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
}
let subscription = fetchResource()
.retryOnNetworkErrors(retries: 2)
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Request succeeded")
case .failure(let error):
print("Request failed with error: \(error)")
}
}, receiveValue: { data in
print("Received data: \(data)")
})
Output (if successful after retries):
Request failed with error: serverError
Request failed with error: serverError
Received data: Resource fetched successfully
Advanced error handling in Combine requires implementing custom retry logic to handle different error scenarios effectively. By using exponential backoff, catching specific errors, and chaining retries, you can build resilient pipelines that gracefully handle network or transient failures.