All-In-One Guide to iOS Combine: Journey from Basics to Advanced Implementation part2

Islam Moussa
12 min readMay 9, 2024

--

The following will be covered:

  • Creating Custom Publishers and Subscribers in iOS Combine
  • Creating Custom Operators in iOS Combine
  • Testing Combine Publishers
  • Understanding Backpressure in Combine
  • Advanced Error Handling in Combine: Complex Retry Logic

Creating Custom Publishers and Subscribers in iOS Combine

While Combine provides a rich set of publishers and subscribers, there are times when you need a custom solution tailored to your specific requirements. Let’s explore how to create custom publishers and subscribers to handle more advanced use cases.

Custom Publishers

A custom publisher allows you to define a unique way to emit values. To create a custom publisher, you’ll need to:

  1. Conform to the Publisher protocol.
  2. Specify the Output and Failure associated types.
  3. Implement the receive(subscriber:) method.

Example: A Custom Publisher that Emits Fibonacci Numbers

Step 1: Implement the Publisher

import Combine

struct FibonacciPublisher: Publisher {
typealias Output = Int
typealias Failure = Never

let count: Int

func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = FibonacciSubscription(subscriber: subscriber, count: count)
subscriber.receive(subscription: subscription)
}
}

Step 2: Create a Subscription

Each subscriber requires a subscription object. We’ll implement a subscription that emits Fibonacci numbers.

private class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int {
private var subscriber: S?
private let count: Int
private var currentIndex = 0
private var fibonacciSequence: [Int] = []

init(subscriber: S, count: Int) {
self.subscriber = subscriber
self.count = count
generateFibonacciSequence()
}

func request(_ demand: Subscribers.Demand) {
var remainingDemand = demand
while remainingDemand > 0 && currentIndex < fibonacciSequence.count {
_ = subscriber?.receive(fibonacciSequence[currentIndex])
currentIndex += 1
remainingDemand -= 1
}

if currentIndex >= fibonacciSequence.count {
subscriber?.receive(completion: .finished)
}
}

func cancel() {
subscriber = nil
}

private func generateFibonacciSequence() {
fibonacciSequence = [0, 1]
for i in 2..<count {
fibonacciSequence.append(fibonacciSequence[i - 1] + fibonacciSequence[i - 2])
}
fibonacciSequence = Array(fibonacciSequence.prefix(count))
}
}

Usage Example:

import Combine

let fibonacciPublisher = FibonacciPublisher(count: 10)
let subscription = fibonacciPublisher.sink { value in
print("Fibonacci value: \(value)")
}

Custom Subscribers

A custom subscriber processes the emitted values from a publisher. To create a custom subscriber, you’ll need to:

  1. Conform to the Subscriber protocol.
  2. Specify the Input and Failure associated types.
  3. Implement the receive(subscription:), receive(_:), and receive(completion:) methods.

Example: A Custom Subscriber that Collects Values

Step 1: Implement the Subscriber

import Combine

class CollectingSubscriber<Input, Failure: Error>: Subscriber {
typealias Input = Input
typealias Failure = Failure

private(set) var collectedValues: [Input] = []
private(set) var completion: Subscribers.Completion<Failure>?
private var subscription: Subscription?

func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.unlimited) // Request unlimited values
}

func receive(_ input: Input) -> Subscribers.Demand {
collectedValues.append(input)
return .unlimited
}

func receive(completion: Subscribers.Completion<Failure>) {
self.completion = completion
}

func cancel() {
subscription?.cancel()
subscription = nil
}
}

Usage Example:

let numbersPublisher = [1, 2, 3, 4, 5].publisher
let collectingSubscriber = CollectingSubscriber<Int, Never>()
numbersPublisher.subscribe(collectingSubscriber)

print("Collected values: \(collectingSubscriber.collectedValues)")

Output:

Collected values: [1, 2, 3, 4, 5]

Creating custom publishers and subscribers in Combine allows you to extend the framework’s capabilities and tailor the data flow to meet your unique requirements. With this flexibility, you can implement complex logic that integrates seamlessly with the rest of your Combine pipelines.

Creating Custom Operators in iOS Combine

Operators in Combine provide powerful ways to manipulate data streams. While the framework provides many built-in operators, you may encounter scenarios that require custom transformations or filtering. Let’s look at how to build custom operators.

Creating a Custom Operator

  1. Define an Extension:
  • Add an extension to the Publisher protocol.
  • The new operator function should return an AnyPublisher.
  1. Use the map, flatMap, etc., or implement your own custom transformation:
  • Inside the custom operator, apply desired transformations using existing operators or by building custom logic.
  1. Type-Erase the Publisher:
  • Use eraseToAnyPublisher() to return an AnyPublisher.

Example: Custom negate Operator

A custom operator that negates boolean values.

Step 1: Define the Operator Extension

import Combine

extension Publisher where Output == Bool, Failure == Never {
func negate() -> AnyPublisher<Bool, Never> {
self.map { !$0 }
.eraseToAnyPublisher()
}
}

Usage Example:

let booleanPublisher = Just(true)
let subscription = booleanPublisher
.negate()
.sink { value in
print("Negated value: \(value)")
}

Output:

Negated value: false

Example: Custom replaceNilWithDefault Operator

A custom operator that replaces nil values with a default.

Step 1: Define the Operator Extension

import Combine

extension Publisher where Output == String? {
func replaceNilWithDefault(_ defaultValue: String) -> AnyPublisher<String, Never> {
self.map { $0 ?? defaultValue }
.eraseToAnyPublisher()
}
}

Usage Example:

let optionalStrings: [String?] = ["Hello", nil, "World", nil, "Combine"]
let publisher = optionalStrings.publisher
let subscription = publisher
.replaceNilWithDefault("Default")
.sink { value in
print("Received: \(value)")
}

Output:

Received: Hello
Received: Default
Received: World
Received: Default
Received: Combine

Testing Combine Publishers

Testing Combine publishers is essential to ensure your data streams behave as expected. This involves verifying both data output and error handling using the XCTest framework. Here's a guide to help you effectively test your Combine pipelines.

Key Considerations for Testing Combine Publishers

  1. Completion Handling:
  • Verify successful or failed completions.
  1. Value Emission:
  • Check that the emitted values are correct.
  1. Timing & Scheduling:
  • Test how values are emitted over time or across threads.

Tools and Frameworks for Testing Combine

  • XCTest: Apple’s standard testing framework.
  • Combine Expectations: A helper library for writing tests with Combine.
  • Combine TestSubscribers: Collect emitted values and completions for validation.

Writing Tests with XCTest

  1. Test Successful Completion:
  • Check if a publisher emits all values successfully and completes.
import XCTest
import Combine

class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []

func testJustPublisher() {
let expectedValue = "Hello, Combine!"
let publisher = Just(expectedValue)

let expectation = XCTestExpectation(description: "Publisher emits value and finishes successfully")

publisher
.sink(receiveCompletion: { completion in
if case .finished = completion {
expectation.fulfill()
} else {
XCTFail("Expected to finish successfully")
}
}, receiveValue: { value in
XCTAssertEqual(value, expectedValue)
})
.store(in: &cancellables)

wait(for: [expectation], timeout: 1.0)
}
}
  1. Test Emitted Values:
  • Validate if the publisher emits a specific sequence of values.
import XCTest
import Combine

class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []

func testArrayPublisher() {
let values = [1, 2, 3, 4, 5]
let publisher = values.publisher

let expectation = XCTestExpectation(description: "Publisher emits all values")

var receivedValues: [Int] = []

publisher
.sink(receiveCompletion: { completion in
if case .finished = completion {
XCTAssertEqual(receivedValues, values)
expectation.fulfill()
} else {
XCTFail("Expected to finish successfully")
}
}, receiveValue: { value in
receivedValues.append(value)
})
.store(in: &cancellables)

wait(for: [expectation], timeout: 1.0)
}
}
  1. Test Failure Handling:
  • Check if the publisher correctly handles errors.
import XCTest
import Combine

class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []

func testFailPublisher() {
enum TestError: Error {
case exampleError
}

let publisher = Fail<Int, TestError>(error: .exampleError)

let expectation = XCTestExpectation(description: "Publisher emits an error")

publisher
.sink(receiveCompletion: { completion in
if case .failure(let error) = completion {
XCTAssertEqual(error, TestError.exampleError)
expectation.fulfill()
} else {
XCTFail("Expected to fail with error")
}
}, receiveValue: { _ in
XCTFail("Expected no value")
})
.store(in: &cancellables)

wait(for: [expectation], timeout: 1.0)
}
}
  1. Test Timing-Based Publishers:
  • Verify behavior of timing-based publishers like Timer and Debounce.
import XCTest
import Combine

class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []

func testTimerPublisher() {
let expectation = XCTestExpectation(description: "Timer emits 5 values")
let expectedCount = 5

let timerPublisher = Timer.publish(every: 0.1, on: .main, in: .common).autoconnect()

var receivedCount = 0

timerPublisher
.prefix(expectedCount) // Limit to 5 emissions
.sink(receiveCompletion: { _ in
XCTAssertEqual(receivedCount, expectedCount)
expectation.fulfill()
}, receiveValue: { _ in
receivedCount += 1
})
.store(in: &cancellables)

wait(for: [expectation], timeout: 1.0)
}

func testDebouncePublisher() {
let expectation = XCTestExpectation(description: "Debounce emits only the last value")
let values = ["A", "B", "C"]
let publisher = PassthroughSubject<String, Never>()

var receivedValues: [String] = []

publisher
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink(receiveCompletion: { _ in
XCTAssertEqual(receivedValues, ["C"])
expectation.fulfill()
}, receiveValue: { value in
receivedValues.append(value)
})
.store(in: &cancellables)

// Send values rapidly
publisher.send("A")
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { publisher.send("B") }
DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) { publisher.send("C") }
DispatchQueue.main.asyncAfter(deadline: .now() + 0.6) { publisher.send(completion: .finished) }

wait(for: [expectation], timeout: 2.0)
}
}
  1. Testing Complex Pipelines:
  • Combine multiple operators and verify the final output.
import XCTest
import Combine

class CombineTests: XCTestCase {
var cancellables: Set<AnyCancellable> = []

func testComplexPipeline() {
let expectation = XCTestExpectation(description: "Pipeline emits correct results")
let values = [1, 2, 3, 4, 5]
let publisher = values.publisher

let expectedResult = [1, 4, 9, 16, 25]

var receivedValues: [Int] = []

publisher
.filter { $0 % 2 != 0 }
.map { $0 * $0 }
.collect()
.sink(receiveCompletion: { _ in
XCTAssertEqual(receivedValues, expectedResult)
expectation.fulfill()
}, receiveValue: { values in
receivedValues = values
})
.store(in: &cancellables)

wait(for: [expectation], timeout: 1.0)
}
}

Backpressure Management in iOS Combine

Backpressure management is crucial in reactive programming to handle the flow of data when publishers produce values faster than subscribers can consume them. Without proper management, this can lead to high memory usage or even application crashes.

Understanding Backpressure in Combine

  1. Demand Mechanism:
  • Combine uses a demand mechanism to control how many values a subscriber wants from the publisher.
  • Subscribers request values from the publisher via the request(_:) method.

2. Subscribers.Demand:

  • Represents the number of values requested by a subscriber.
  • The Subscribers.Demand type provides three modes:
  • unlimited: Requests all available values without limit.
  • max(_:): Requests a specific number of values.
  • none: Requests no values.

3. Demand Management:

  • A subscriber can also adjust its demand during the data processing pipeline by returning a new Subscribers.Demand from the receive(_:) method.

Backpressure Management with Custom Subscribers

When creating custom subscribers, it’s essential to manage the demand to avoid overloading the subscriber.

Example: Creating a Custom Subscriber with Demand Management

import Combine

class DemandManagingSubscriber<Input, Failure: Error>: Subscriber {
private var subscription: Subscription?
private let maxDemand: Int
private var receivedCount = 0

init(maxDemand: Int) {
self.maxDemand = maxDemand
}

func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.max(maxDemand))
}

func receive(_ input: Input) -> Subscribers.Demand {
receivedCount += 1
print("Received value: \(input)")

if receivedCount >= maxDemand {
subscription?.cancel()
return .none
}

return .max(1)
}

func receive(completion: Subscribers.Completion<Failure>) {
print("Received completion: \(completion)")
}
}

let numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9].publisher
let customSubscriber = DemandManagingSubscriber<Int, Never>(maxDemand: 5)
numbers.subscribe(customSubscriber)

Backpressure with Built-in Subscribers

  1. Collect Operator:
  • The collect operator gathers a specific number of values before passing them downstream.
  • This is useful for managing backpressure by controlling how frequently data is emitted.

Example: Using collect to Reduce Emission Frequency

import Combine

let values = (1...10).publisher

let subscription = values
.collect(3) // Collect values in batches of 3
.sink { batch in
print("Batch: \(batch)")
}

Output:

Batch: [1, 2, 3]
Batch: [4, 5, 6]
Batch: [7, 8, 9]
Batch: [10]
  1. Buffer Operator:
  • The buffer operator maintains a buffer of emitted values up to a specified size.
  • This allows subscribers to request data at their own pace.

Example: Using buffer to Handle Burst Emissions

import Combine

let values = (1...20).publisher

let subscription = values
.buffer(size: 5, prefetch: .keepFull, whenFull: .dropOldest)
.sink { value in
print("Received value: \(value)")
}

Output:

Received value: 1
Received value: 2
Received value: 3
Received value: 4
Received value: 5
  1. Throttle Operator:
  • The throttle operator limits how often values are emitted by setting a minimum interval between emissions.
  • Useful for controlling the flow of rapidly emitted values.

Example: Using throttle to Limit Emission Frequency

import Combine
import Foundation

let subject = PassthroughSubject<String, Never>()

let subscription = subject
.throttle(for: .seconds(1.0), scheduler: RunLoop.main, latest: true)
.sink { value in
print("Throttled value: \(value)")
}

// Send values quickly
subject.send("A")
DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) { subject.send("B") }
DispatchQueue.main.asyncAfter(deadline: .now() + 0.4) { subject.send("C") }
DispatchQueue.main.asyncAfter(deadline: .now() + 1.2) { subject.send("D") }
DispatchQueue.main.asyncAfter(deadline: .now() + 1.4) { subject.send("E") }
DispatchQueue.main.asyncAfter(deadline: .now() + 2.0) { subject.send(completion: .finished) }

Output:

Throttled value: A
Throttled value: C
Throttled value: E

Concurrency in Combine

Concurrency in Combine deals with managing asynchronous tasks, ensuring that publishers and subscribers operate efficiently and safely across different threads. This is essential for maintaining UI responsiveness and offloading heavy work to background threads.

Key Concepts

Schedulers:

  • Schedulers represent the context or thread on which a publisher emits values to subscribers.
  • Combine provides several built-in schedulers to handle threading:
  • Main: The main thread, often used for UI updates.
  • Background (Global DispatchQueue): For computationally expensive tasks.
  • RunLoop: An event-driven loop.
  • OperationQueue: A high-level abstraction for task queues.

receive(on:) vs subscribe(on:):

  • receive(on:) changes the thread where the subscriber processes values.
  • subscribe(on:) changes the thread where the publisher does its initial setup.

Examples of Concurrency Handling

  1. Using receive(on:) to Change Processing Thread:
import Combine
import Foundation

let values = [1, 2, 3, 4, 5].publisher

let subscription = values
.map { value -> Int in
print("Mapping value \(value) on thread: \(Thread.current)")
return value * 2
}
.receive(on: DispatchQueue.main) // Switch to main thread for UI processing
.sink { value in
print("Received value \(value) on thread: \(Thread.current)")
}

Output:

Mapping value 1 on thread: <background thread>
Mapping value 2 on thread: <background thread>
Mapping value 3 on thread: <background thread>
Mapping value 4 on thread: <background thread>
Mapping value 5 on thread: <background thread>
Received value 2 on thread: <main thread>
Received value 4 on thread: <main thread>
Received value 6 on thread: <main thread>
Received value 8 on thread: <main thread>
Received value 10 on thread: <main thread>

2. Using subscribe(on:) to Change Subscription Thread:

import Combine
import Foundation

let subject = PassthroughSubject<Int, Never>()

let subscription = subject
.subscribe(on: DispatchQueue.global()) // Publisher starts on background thread
.map { value -> Int in
print("Mapping value \(value) on thread: \(Thread.current)")
return value * 2
}
.sink { value in
print("Received value \(value) on thread: \(Thread.current)")
}

// Simulate a burst of values
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .finished)

Output:

Mapping value 1 on thread: <background thread>
Mapping value 2 on thread: <background thread>
Mapping value 3 on thread: <background thread>
Received value 2 on thread: <background thread>
Received value 4 on thread: <background thread>
Received value 6 on thread: <background thread>

3. Combining receive(on:) and subscribe(on:):

You can use both receive(on:) and subscribe(on:) in tandem to carefully control which threads handle the publisher setup and value reception.

import Combine
import Foundation

let values = [1, 2, 3, 4, 5].publisher

let subscription = values
.subscribe(on: DispatchQueue.global()) // Publisher starts on a background thread
.map { value -> Int in
print("Mapping value \(value) on thread: \(Thread.current)")
return value * 2
}
.receive(on: DispatchQueue.main) // Values received on the main thread
.sink { value in
print("Received value \(value) on thread: \(Thread.current)")
}

Output:

Mapping value 1 on thread: <background thread>
Mapping value 2 on thread: <background thread>
Mapping value 3 on thread: <background thread>
Mapping value 4 on thread: <background thread>
Mapping value 5 on thread: <background thread>
Received value 2 on thread: <main thread>
Received value 4 on thread: <main thread>
Received value 6 on thread: <main thread>
Received value 8 on thread: <main thread>
Received value 10 on thread: <main thread>

Concurrency in Combine is a powerful tool that enables efficient data processing and responsive user interfaces. By using schedulers and controlling the subscription and reception threads, you can ensure your Combine pipelines are well-optimized for various processing contexts.

Advanced Error Handling in Combine: Complex Retry Logic

In Combine, handling errors effectively is crucial to building resilient applications. While the retry operator provides basic retry functionality, more complex retry logic might be required for handling different error scenarios or implementing exponential backoff.

Key Concepts for Complex Retry Logic

Retry Operator:

  • Retries a failed publisher up to a specified number of times before emitting an error.

catch Operator:

  • Catches errors and replaces them with a fallback publisher.

Custom Retry Logic:

  • Using the flatMap operator to implement more advanced retry logic, like exponential backoff.

Implementing Complex Retry Logic

  1. Basic Retry with retry:
  • Retries a failed network request up to 3 times before failing.
import Combine

enum NetworkError: Error {
case serverError
}

func fetchData() -> AnyPublisher<String, NetworkError> {
return Fail<String, NetworkError>(error: .serverError)
.eraseToAnyPublisher()
}

let subscription = fetchData()
.retry(3)
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Request succeeded")
case .failure(let error):
print("Request failed with error: \(error)")
}
}, receiveValue: { data in
print("Received data: \(data)")
})

Output:

Request failed with error: serverError

2. Custom Retry Logic with Specific Error Handling:

Use a catch operator to retry based on specific error types and handle retries accordingly.

Example: Retry on Network Errors Only

import Combine
import Foundation

enum NetworkError: Error {
case serverError
case notConnected
case unauthorized
}

func fetchResource() -> AnyPublisher<String, NetworkError> {
return Future<String, NetworkError> { promise in
let error: NetworkError? = Bool.random() ? .serverError : nil
if let error = error {
promise(.failure(error))
} else {
promise(.success("Resource fetched successfully"))
}
}
.eraseToAnyPublisher()
}

extension Publisher where Failure == NetworkError {
func retryOnNetworkErrors(retries: Int) -> AnyPublisher<Output, Failure> {
self.catch { error in
if retries > 0 && (error == .serverError || error == .notConnected) {
return Just(())
.delay(for: .seconds(2), scheduler: DispatchQueue.main)
.setFailureType(to: Failure.self)
.flatMap { _ in
self.retryOnNetworkErrors(retries: retries - 1)
}
.eraseToAnyPublisher()
} else {
return Fail(outputType: Output.self, failure: error).eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
}

let subscription = fetchResource()
.retryOnNetworkErrors(retries: 2)
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Request succeeded")
case .failure(let error):
print("Request failed with error: \(error)")
}
}, receiveValue: { data in
print("Received data: \(data)")
})

Output (if successful after retries):

Request failed with error: serverError
Request failed with error: serverError
Received data: Resource fetched successfully

Advanced error handling in Combine requires implementing custom retry logic to handle different error scenarios effectively. By using exponential backoff, catching specific errors, and chaining retries, you can build resilient pipelines that gracefully handle network or transient failures.

--

--

Islam Moussa

Professional iOS Developer, cross-platform developer and backend developer from Egypt