Managing Backpressure in Reactive Streams with Combine:

Shashidhar Jagatap
9 min readJan 1, 2024

--

Strategies and Operators for Efficient Data Flow

1. Throttling:

  • throttle: Delays the delivery of elements, emitting only the most recent value within a specified time window.
  • debounce: Waits for a specified interval of inactivity before emitting the latest value.

throttle(for:) Operator Example:

The throttle operator delays the delivery of elements, emitting only the most recent value within a specified time window. It ensures that a new value is emitted only if a specific time duration has passed since the last emission.

import Combine
import Foundation

// A publisher that emits values every 0.5 seconds
let publisher = Timer.publish(every: 0.5, on: .main, in: .default)
.autoconnect()
.prefix(10) // Limiting emissions for this example

let throttleExample = publisher
.throttle(for: .seconds(1), scheduler: DispatchQueue.main, latest: true)
.sink { value in
print("Throttled Value:", value) // Prints the most recent value emitted after 1 second
}
//Output:
//Throttled Value: 2024-01-01 06:29:52 +0000
//Throttled Value: 2024-01-01 06:29:53 +0000
//Throttled Value: 2024-01-01 06:29:54 +0000
//Throttled Value: 2024-01-01 06:29:55 +0000
//Throttled Value: 2024-01-01 06:29:56 +0000
//Throttled Value: 2024-01-01 06:29:56 +0000

In this example:

  • Timer.publish creates a publisher emitting values every 0.5 seconds.
  • .throttle delays the emission of values, ensuring that only the most recent value emitted after a time interval of 1 second (for: .seconds(1)) gets through.
  • latest: true parameter indicates that only the most recent value is emitted after the specified interval.

debounce(for:) Operator Example:

The debounce operator waits for a specified interval of inactivity before emitting the latest value. It delays the emission of values until a certain duration of silence is observed in the stream.

import Combine
import Foundation

// A publisher that emits values every 0.3 seconds
let publisher = Timer.publish(every: 0.3, on: .main, in: .default)
.autoconnect()
.prefix(10) // Limiting emissions for this example

let debounceExample = publisher
.debounce(for: .seconds(1), scheduler: DispatchQueue.main)
.sink { value in
print("Debounced Value:", value) // Prints the latest value after 1 second of inactivity
}

In this example:

  • Timer.publish creates a publisher emitting values every 0.3 seconds.
  • .debounce delays the emission of values, waiting for 1 second of inactivity (for: .seconds(1)) before emitting the latest value.
  • The publisher emits the latest value only if there’s a gap of at least 1 second between emissions, otherwise, it keeps waiting for the specified inactivity duration.

Both throttle and debounce are useful operators in controlling the flow of data, especially in scenarios where you want to limit the rate of emissions or handle situations where rapid changes occur but you're interested in the most recent value after a specific delay or inactivity.

2. Buffering:

  • buffer: Collects elements emitted by the publisher into buffers of a specified size and delivers these buffers downstream.
  • collect: Gathers incoming elements into arrays of a defined size before emitting them downstream.

buffer Operator Example:

The buffer operator collects elements emitted by the publisher into buffers of a specified size and delivers these buffers downstream.

import Combine

// A publisher emitting a sequence of integers
let publisher = (1...10).publisher

let bufferExample = publisher // Collects elements into buffers of size 4
.buffer(size: 4, prefetch: .byRequest, whenFull: .dropOldest)
.sink { value in
print("Buffered Values:", value) // Prints arrays containing collected elements
}

//Output:
//Buffered Values: 7
//Buffered Values: 8
//Buffered Values: 9
//Buffered Values: 10

In this example:

  • The buffer operator is applied to the publisher emitted by the sequence of integers.
  • size: 4 specifies that elements will be collected into buffers of size 4.
  • As a result, the subscriber receives arrays containing the collected elements in buffers of size 4.

collect Operator Example:

The collect operator gathers incoming elements into arrays of a defined size before emitting them downstream.

import Combine

// A publisher emitting a sequence of characters
let publisher = ["A", "B", "C", "D", "E", "F", "G"].publisher

let collectExample = publisher
.collect(3) // Gathers elements into arrays of size 3
.sink { value in
print("Collected Values:", value) // Prints arrays containing collected elements
}

// Output:
// Collected Values: ["A", "B", "C"]
// Collected Values: ["D", "E", "F"]
// Collected Values: ["G"]

In this example:

  • The collect operator is applied to the publisher emitted by the sequence of characters.
  • 3 specifies that elements will be gathered into arrays of size 3.
  • The subscriber receives arrays containing the collected elements in arrays of size 3.

Both buffer and collect are useful operators for managing backpressure by batching elements into arrays or buffers of a specified size. These operators allow controlling the rate of emission and handling situations where the downstream subscriber may need to process data in larger chunks for efficient handling.

3. Dropping or Ignoring Values:

  • dropFirst and dropWhile: Discard the first few elements or ignore elements based on specific conditions.
  • ignoreOutput: Ignores all incoming values and only forwards completion or failure events.

dropFirst Operator Example:

The dropFirst operator discards the first few elements emitted by the publisher.

import Combine

// A publisher emitting a sequence of integers
let publisher = (1...5).publisher

let dropFirstExample = publisher
.dropFirst(2) // Discards the first 2 elements
.sink { value in
print("Dropped Value:", value) // Prints the values after dropping the specified number of elements
}

// Output:
// Dropped Value: 3
// Dropped Value: 4
// Dropped Value: 5

In this example:

  • The dropFirst operator is applied to the publisher emitting a sequence of integers.
  • .dropFirst(2) specifies to discard the first 2 elements emitted by the publisher.
  • The subscriber receives values emitted by the publisher after discarding the specified number of elements.

dropWhile Operator Example:

The dropWhile operator ignores elements based on specific conditions until a condition is met.

import Combine

// A publisher emitting a sequence of integers
let publisher = (1...10).publisher

let dropWhileExample = publisher
.drop(while: { $0 < 5 }) // Ignores elements while the condition is true (less than 5)
.sink { value in
print("Dropped While Value:", value) // Prints the values after the condition is no longer met
}

// Output:
// Dropped While Value: 5
//Dropped While Value: 6
//Dropped While Value: 7
//Dropped While Value: 8
//Dropped While Value: 9
//Dropped While Value: 10

In this example:

  • The drop(while:) operator is applied to the publisher emitting a sequence of integers.
  • { $0 < 5 } specifies the condition to ignore elements while they are less than 5.
  • The subscriber receives values emitted by the publisher after the condition is no longer met.

ignoreOutput Operator Example:

The ignoreOutput operator ignores all incoming values and only forwards completion or failure events downstream.

import Combine

// A publisher emitting a sequence of integers
let publisher = (1...5).publisher

let ignoreOutputExample = publisher
.ignoreOutput() // Ignores all incoming values
.sink(receiveCompletion: { completion in
print("Completion:", completion) // Prints the completion event (finished or failed)
}, receiveValue: { _ in })

// Output:
// Completion: finished

In this example:

  • The ignoreOutput operator is applied to the publisher emitting a sequence of integers.
  • It ignores all incoming values and only forwards the completion event (finished or failed) downstream.
  • The subscriber receives the completion event but does not handle individual values.

These operators (dropFirst, dropWhile, and ignoreOutput) are helpful for handling scenarios where specific elements need to be discarded based on conditions or when only completion or failure events are relevant, enabling efficient handling of data flow in Combine

4. Switching Strategies:

  • switchToLatest: Emits elements from the latest publisher while ignoring elements from previous publishers.
  • flatMapLatest: Maps each element to a new publisher, emitting elements only from the latest mapped publisher.

switchToLatest Operator Example:

The switchToLatest operator emits elements from the latest publisher while ignoring elements from previous publishers.

import Combine
import Foundation

let subject = PassthroughSubject<Int, Never>()
cancellable = subject
.setFailureType(to: URLError.self)
.map() { index -> URLSession.DataTaskPublisher in
let url = URL(string: "https://example.org/get?index=\(index)")!
return URLSession.shared.dataTaskPublisher(for: url)
}
.switchToLatest()
.sink(receiveCompletion: { print("Complete: \($0)") },
receiveValue: { (data, response) in
guard let url = response.url else { print("Bad response."); return }
print("URL: \(url)")
})


for index in 1...5 {
DispatchQueue.main.asyncAfter(deadline: .now() + TimeInterval(index/10)) {
subject.send(index)
}
}


// Prints "URL: https://example.org/get?index=5"

The exact behavior of this example depends on the value of asyncAfter and the speed of the network connection. If the delay value is longer, or the network connection is fast, the earlier data tasks may complete before switchToLatest() can cancel them. If this happens, the output includes multiple URLs whose tasks complete before cancellation.

flatMapLatest Operator Example:

The flatMapLatest operator maps each element to a new publisher and emits elements only from the latest mapped publisher.

import Combine

// A publisher emitting a sequence of integers
let publisher = PassthroughSubject<Int, Never>()

let flatMapLatestExample = publisher
.flatMap { value in
return Just(value * 2).delay(for: .seconds(1), scheduler: RunLoop.main)
}
.sink { value in
print("Mapped Value:", value) // Prints elements from the latest mapped publisher
}

publisher.send(1)
publisher.send(2)
publisher.send(3)

// Output:
// Mapped Value: 2
// Mapped Value: 4
// Mapped Value: 6

In this example:

  • PassthroughSubject is a manual publisher emitting a sequence of integers.
  • .flatMap operator is used to map each element to a new publisher (Just(value * 2)) that multiplies the received value by 2.
  • .delay is applied to the mapped publisher to simulate a delayed emission.
  • The subscriber receives elements from the latest mapped publisher, ensuring that only elements from the latest mapped publisher are emitted downstream.

Both switchToLatest and flatMapLatest operators are useful for dynamically switching between publishers or handling scenarios where elements need to be emitted from the latest publisher or mapped publisher, providing flexibility and control over data flow in Combine.

5. Handling Errors and Timeouts

  • timeout: Terminates the sequence with an error if a certain time elapses without receiving any elements.
  • retry: Resubscribes to the upstream publisher in case of failure a specified number of times.

timeout Operator Example:

The timeout operator terminates the sequence with an error if a certain time elapses without receiving any elements.

import Combine
import Foundation

enum MyError: Error {
case timeout
// Add other error cases as needed
}

let subject = PassthroughSubject<Int, Error>()

let timeoutExample = subject
.timeout(.seconds(2), scheduler: DispatchQueue.main,customError: {
return MyError.timeout // Replace MyCustomError with your own error type
})
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Sequence completed successfully.")
case .failure(let error):
print("Sequence terminated with error:", error)
}
}, receiveValue: { value in
print("Received value:", value)
})

// Sending a value 1 immediately
DispatchQueue.main.asyncAfter(deadline: .now() ) {
subject.send(1)
}

// Sending a value after 1 seconds (beyond the timeout duration)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
subject.send(2)
}

// Sending a value after 4 seconds (beyond the timeout duration)
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
subject.send(3)
}

// Output:
// Received value: 1
// Received value: 2
// Sequence terminated with error: timeout

In this example:

  • timeout(.seconds(2), scheduler: DispatchQueue.main) specifies a timeout of 2 seconds.
  • The PassthroughSubject will receive a value after 3 seconds, beyond the specified timeout duration.
  • As a result, the sequence terminates with a timeout error after 4 seconds.

retry Operator Example:

The retry operator resubscribes to the upstream publisher in case of a failure a specified number of times.

import Combine

var attempts = 0

let retryExample = Future<Int, Error> { promise in
attempts += 1
if attempts < 3 {
promise(.failure(NSError(domain: "", code: 0, userInfo: nil))) // Simulating an error
} else {
promise(.success(42)) // Emits a success value after retries
}
}
.retry(3) // Retry up to 3 times
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Sequence completed successfully.")
case .failure(let error):
print("Sequence terminated with error after retries:", error)
}
}, receiveValue: { value in
print("Received value:", value)
})

// Output:
// Sequence terminated with error after retries: Error Domain= Code=0 "(null)"

In this example:

  • Future is used to simulate an asynchronous task that may fail (NSError is used for simulation purposes).
  • The retry(3) operator specifies to retry the sequence up to 3 times.
  • On the first two attempts, the Future emits a failure (simulated error), and on the third attempt, it emits a success value.
  • The sequence will complete with a success value after the allowed retries.

These examples showcase how timeout can handle sequence termination due to timeout and how retry can manage retries on failure within a specified number of attempts in Combine.

These operators and strategies provided by Combine allow developers to control and manage backpressure scenarios effectively. By using these operators strategically, developers can prevent overwhelming downstream subscribers, manage resource utilization, and ensure the smooth flow of data in reactive streams.

Each operator serves a specific purpose in controlling the rate of data flow, buffering elements, dropping or ignoring values, or applying switching strategies, providing versatile options to handle backpressure situations in Combine’s reactive pipelines.

Thank you for reading…

Happy Coding !!

Connect with me for more updates:

Support my efforts by :

--

--

Shashidhar Jagatap

iOS Developer | Objective C | Swift | SwiftUI | UIKit | Combine | Crafting Engaging Mobile Experiences |