Managing Backpressure in Reactive Streams with Combine:
Strategies and Operators for Efficient Data Flow
1. Throttling:
throttle
: Delays the delivery of elements, emitting only the most recent value within a specified time window.debounce
: Waits for a specified interval of inactivity before emitting the latest value.
throttle(for:)
Operator Example:
The throttle
operator delays the delivery of elements, emitting only the most recent value within a specified time window. It ensures that a new value is emitted only if a specific time duration has passed since the last emission.
import Combine
import Foundation
// A publisher that emits values every 0.5 seconds
let publisher = Timer.publish(every: 0.5, on: .main, in: .default)
.autoconnect()
.prefix(10) // Limiting emissions for this example
let throttleExample = publisher
.throttle(for: .seconds(1), scheduler: DispatchQueue.main, latest: true)
.sink { value in
print("Throttled Value:", value) // Prints the most recent value emitted after 1 second
}
//Output:
//Throttled Value: 2024-01-01 06:29:52 +0000
//Throttled Value: 2024-01-01 06:29:53 +0000
//Throttled Value: 2024-01-01 06:29:54 +0000
//Throttled Value: 2024-01-01 06:29:55 +0000
//Throttled Value: 2024-01-01 06:29:56 +0000
//Throttled Value: 2024-01-01 06:29:56 +0000
In this example:
Timer.publish
creates a publisher emitting values every 0.5 seconds..throttle
delays the emission of values, ensuring that only the most recent value emitted after a time interval of 1 second (for: .seconds(1)
) gets through.latest: true
parameter indicates that only the most recent value is emitted after the specified interval.
debounce(for:)
Operator Example:
The debounce
operator waits for a specified interval of inactivity before emitting the latest value. It delays the emission of values until a certain duration of silence is observed in the stream.
import Combine
import Foundation
// A publisher that emits values every 0.3 seconds
let publisher = Timer.publish(every: 0.3, on: .main, in: .default)
.autoconnect()
.prefix(10) // Limiting emissions for this example
let debounceExample = publisher
.debounce(for: .seconds(1), scheduler: DispatchQueue.main)
.sink { value in
print("Debounced Value:", value) // Prints the latest value after 1 second of inactivity
}
In this example:
Timer.publish
creates a publisher emitting values every 0.3 seconds..debounce
delays the emission of values, waiting for 1 second of inactivity (for: .seconds(1)
) before emitting the latest value.- The publisher emits the latest value only if there’s a gap of at least 1 second between emissions, otherwise, it keeps waiting for the specified inactivity duration.
Both throttle
and debounce
are useful operators in controlling the flow of data, especially in scenarios where you want to limit the rate of emissions or handle situations where rapid changes occur but you're interested in the most recent value after a specific delay or inactivity.
2. Buffering:
buffer
: Collects elements emitted by the publisher into buffers of a specified size and delivers these buffers downstream.collect
: Gathers incoming elements into arrays of a defined size before emitting them downstream.
buffer
Operator Example:
The buffer
operator collects elements emitted by the publisher into buffers of a specified size and delivers these buffers downstream.
import Combine
// A publisher emitting a sequence of integers
let publisher = (1...10).publisher
let bufferExample = publisher // Collects elements into buffers of size 4
.buffer(size: 4, prefetch: .byRequest, whenFull: .dropOldest)
.sink { value in
print("Buffered Values:", value) // Prints arrays containing collected elements
}
//Output:
//Buffered Values: 7
//Buffered Values: 8
//Buffered Values: 9
//Buffered Values: 10
In this example:
- The
buffer
operator is applied to the publisher emitted by the sequence of integers. size: 4
specifies that elements will be collected into buffers of size 4.- As a result, the subscriber receives arrays containing the collected elements in buffers of size 4.
collect
Operator Example:
The collect
operator gathers incoming elements into arrays of a defined size before emitting them downstream.
import Combine
// A publisher emitting a sequence of characters
let publisher = ["A", "B", "C", "D", "E", "F", "G"].publisher
let collectExample = publisher
.collect(3) // Gathers elements into arrays of size 3
.sink { value in
print("Collected Values:", value) // Prints arrays containing collected elements
}
// Output:
// Collected Values: ["A", "B", "C"]
// Collected Values: ["D", "E", "F"]
// Collected Values: ["G"]
In this example:
- The
collect
operator is applied to the publisher emitted by the sequence of characters. 3
specifies that elements will be gathered into arrays of size 3.- The subscriber receives arrays containing the collected elements in arrays of size 3.
Both buffer
and collect
are useful operators for managing backpressure by batching elements into arrays or buffers of a specified size. These operators allow controlling the rate of emission and handling situations where the downstream subscriber may need to process data in larger chunks for efficient handling.
3. Dropping or Ignoring Values:
dropFirst
anddropWhile
: Discard the first few elements or ignore elements based on specific conditions.ignoreOutput
: Ignores all incoming values and only forwards completion or failure events.
dropFirst
Operator Example:
The dropFirst
operator discards the first few elements emitted by the publisher.
import Combine
// A publisher emitting a sequence of integers
let publisher = (1...5).publisher
let dropFirstExample = publisher
.dropFirst(2) // Discards the first 2 elements
.sink { value in
print("Dropped Value:", value) // Prints the values after dropping the specified number of elements
}
// Output:
// Dropped Value: 3
// Dropped Value: 4
// Dropped Value: 5
In this example:
- The
dropFirst
operator is applied to the publisher emitting a sequence of integers. .dropFirst(2)
specifies to discard the first 2 elements emitted by the publisher.- The subscriber receives values emitted by the publisher after discarding the specified number of elements.
dropWhile
Operator Example:
The dropWhile
operator ignores elements based on specific conditions until a condition is met.
import Combine
// A publisher emitting a sequence of integers
let publisher = (1...10).publisher
let dropWhileExample = publisher
.drop(while: { $0 < 5 }) // Ignores elements while the condition is true (less than 5)
.sink { value in
print("Dropped While Value:", value) // Prints the values after the condition is no longer met
}
// Output:
// Dropped While Value: 5
//Dropped While Value: 6
//Dropped While Value: 7
//Dropped While Value: 8
//Dropped While Value: 9
//Dropped While Value: 10
In this example:
- The
drop(while:)
operator is applied to the publisher emitting a sequence of integers. { $0 < 5 }
specifies the condition to ignore elements while they are less than 5.- The subscriber receives values emitted by the publisher after the condition is no longer met.
ignoreOutput
Operator Example:
The ignoreOutput
operator ignores all incoming values and only forwards completion or failure events downstream.
import Combine
// A publisher emitting a sequence of integers
let publisher = (1...5).publisher
let ignoreOutputExample = publisher
.ignoreOutput() // Ignores all incoming values
.sink(receiveCompletion: { completion in
print("Completion:", completion) // Prints the completion event (finished or failed)
}, receiveValue: { _ in })
// Output:
// Completion: finished
In this example:
- The
ignoreOutput
operator is applied to the publisher emitting a sequence of integers. - It ignores all incoming values and only forwards the completion event (
finished
orfailed
) downstream. - The subscriber receives the completion event but does not handle individual values.
These operators (dropFirst
, dropWhile
, and ignoreOutput
) are helpful for handling scenarios where specific elements need to be discarded based on conditions or when only completion or failure events are relevant, enabling efficient handling of data flow in Combine
4. Switching Strategies:
switchToLatest
: Emits elements from the latest publisher while ignoring elements from previous publishers.flatMapLatest
: Maps each element to a new publisher, emitting elements only from the latest mapped publisher.
switchToLatest
Operator Example:
The switchToLatest
operator emits elements from the latest publisher while ignoring elements from previous publishers.
import Combine
import Foundation
let subject = PassthroughSubject<Int, Never>()
cancellable = subject
.setFailureType(to: URLError.self)
.map() { index -> URLSession.DataTaskPublisher in
let url = URL(string: "https://example.org/get?index=\(index)")!
return URLSession.shared.dataTaskPublisher(for: url)
}
.switchToLatest()
.sink(receiveCompletion: { print("Complete: \($0)") },
receiveValue: { (data, response) in
guard let url = response.url else { print("Bad response."); return }
print("URL: \(url)")
})
for index in 1...5 {
DispatchQueue.main.asyncAfter(deadline: .now() + TimeInterval(index/10)) {
subject.send(index)
}
}
// Prints "URL: https://example.org/get?index=5"
The exact behavior of this example depends on the value of asyncAfter
and the speed of the network connection. If the delay value is longer, or the network connection is fast, the earlier data tasks may complete before switchToLatest()
can cancel them. If this happens, the output includes multiple URLs whose tasks complete before cancellation.
flatMapLatest
Operator Example:
The flatMapLatest
operator maps each element to a new publisher and emits elements only from the latest mapped publisher.
import Combine
// A publisher emitting a sequence of integers
let publisher = PassthroughSubject<Int, Never>()
let flatMapLatestExample = publisher
.flatMap { value in
return Just(value * 2).delay(for: .seconds(1), scheduler: RunLoop.main)
}
.sink { value in
print("Mapped Value:", value) // Prints elements from the latest mapped publisher
}
publisher.send(1)
publisher.send(2)
publisher.send(3)
// Output:
// Mapped Value: 2
// Mapped Value: 4
// Mapped Value: 6
In this example:
PassthroughSubject
is a manual publisher emitting a sequence of integers..flatMap
operator is used to map each element to a new publisher (Just(value * 2)
) that multiplies the received value by 2..delay
is applied to the mapped publisher to simulate a delayed emission.- The subscriber receives elements from the latest mapped publisher, ensuring that only elements from the latest mapped publisher are emitted downstream.
Both switchToLatest
and flatMapLatest
operators are useful for dynamically switching between publishers or handling scenarios where elements need to be emitted from the latest publisher or mapped publisher, providing flexibility and control over data flow in Combine.
5. Handling Errors and Timeouts
timeout
: Terminates the sequence with an error if a certain time elapses without receiving any elements.retry
: Resubscribes to the upstream publisher in case of failure a specified number of times.
timeout
Operator Example:
The timeout
operator terminates the sequence with an error if a certain time elapses without receiving any elements.
import Combine
import Foundation
enum MyError: Error {
case timeout
// Add other error cases as needed
}
let subject = PassthroughSubject<Int, Error>()
let timeoutExample = subject
.timeout(.seconds(2), scheduler: DispatchQueue.main,customError: {
return MyError.timeout // Replace MyCustomError with your own error type
})
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Sequence completed successfully.")
case .failure(let error):
print("Sequence terminated with error:", error)
}
}, receiveValue: { value in
print("Received value:", value)
})
// Sending a value 1 immediately
DispatchQueue.main.asyncAfter(deadline: .now() ) {
subject.send(1)
}
// Sending a value after 1 seconds (beyond the timeout duration)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
subject.send(2)
}
// Sending a value after 4 seconds (beyond the timeout duration)
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
subject.send(3)
}
// Output:
// Received value: 1
// Received value: 2
// Sequence terminated with error: timeout
In this example:
timeout(.seconds(2), scheduler: DispatchQueue.main)
specifies a timeout of 2 seconds.- The
PassthroughSubject
will receive a value after 3 seconds, beyond the specified timeout duration. - As a result, the sequence terminates with a timeout error after 4 seconds.
retry
Operator Example:
The retry
operator resubscribes to the upstream publisher in case of a failure a specified number of times.
import Combine
var attempts = 0
let retryExample = Future<Int, Error> { promise in
attempts += 1
if attempts < 3 {
promise(.failure(NSError(domain: "", code: 0, userInfo: nil))) // Simulating an error
} else {
promise(.success(42)) // Emits a success value after retries
}
}
.retry(3) // Retry up to 3 times
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
print("Sequence completed successfully.")
case .failure(let error):
print("Sequence terminated with error after retries:", error)
}
}, receiveValue: { value in
print("Received value:", value)
})
// Output:
// Sequence terminated with error after retries: Error Domain= Code=0 "(null)"
In this example:
Future
is used to simulate an asynchronous task that may fail (NSError
is used for simulation purposes).- The
retry(3)
operator specifies to retry the sequence up to 3 times. - On the first two attempts, the
Future
emits a failure (simulated error), and on the third attempt, it emits a success value. - The sequence will complete with a success value after the allowed retries.
These examples showcase how timeout
can handle sequence termination due to timeout and how retry
can manage retries on failure within a specified number of attempts in Combine.
These operators and strategies provided by Combine allow developers to control and manage backpressure scenarios effectively. By using these operators strategically, developers can prevent overwhelming downstream subscribers, manage resource utilization, and ensure the smooth flow of data in reactive streams.
Each operator serves a specific purpose in controlling the rate of data flow, buffering elements, dropping or ignoring values, or applying switching strategies, providing versatile options to handle backpressure situations in Combine’s reactive pipelines.
Thank you for reading…
Happy Coding !!
Connect with me for more updates:
Support my efforts by :