stream_ext — version 0.3.0 is out

Yan Cui
theburningmonk.com
Published in
3 min readSep 26, 2013

I have just published version 0.3.0 of stream_ext, my attempt to port the Rx APIs to Dart. In this version I have added a number of additional methods to the existing set of:

amb

StreamExt.amb has the following signature:

Stream amb(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method propagates values from the stream that reacts first with a value.

This method will ignore any errors received from either stream until the first value is received. The stream which reacts first with a value will have its values and errors propagated through the output stream.

The output stream will complete if:

  • neither stream produced a value before completing
  • the propagated stream has completed
  • the closeOnError flag is set to true and an error is received in the propagated stream
Marble Diagram

Live demo here.

More information on the Observable.amb extension method from Rx API here.

log

StreamExt.log has the following signature:

void log(Stream input, [ String prefix, void log(Object msg) ])

this helper method provide an easy way to log when new values and errors are received and when the stream is done.

You can see example usage here.

onErrorResumeNext

StreamExt.onErrorResumeNext has the following signature:

Stream onErrorResumeNext(Stream stream1, Stream stream2, { bool closeOnError : false, bool sync : false })

this method allows the continuation of a stream with another regardless of whether the first stream completes gracefully or due to an error.

The output stream will complete if:

  • both input streams have completed (if stream 2 completes before stream 1 then the output stream is completed when stream 1 completes)
  • closeOnError flag is set to true and an error is received in the continuation stream
Marble Diagram

Live demo here.

More information on the Observable.onErrorResumeNext extension method from the Rx API here.

switchFrom

StreamExt.switchFrom has the following signature:

Stream switchFrom(Stream<Stream> inputs, { bool closeOnError : false, bool sync : false })

this method transforms a stream of streams into a stream producing values only from the most recent stream.

The output stream will complete if:

  • the input stream has completed and the last stream has completed
  • closeOnError flag is set to true and an error is received in the active stream
Marble Diagram

Live demo here.

More information on the Observable.Switch extension method from the Rx API here.

timeOut

StreamExt.timeOut has the following signature:

Stream timeOut(Stream input, Duration duration, { bool closeOnError : false, bool sync : false })

this method allows you to terminate a stream with a TimeoutError if the specified duration between values elapsed.

The output stream will complete if:

  • the input stream has completed
  • the specified duration between input values has elapsed
  • closeOnError flag is set to true and an error is received
Marble Diagram

Live demo here.

More information on the Observable.Timeout extension method from the Rx API here.

timeOutAt

StreamExt.timeOutAt has the following signature:

Stream timeOutAt(Stream input, DateTime dueTime, { bool closeOnError : false, bool sync : false })

this method allows you to terminate a stream with a TimeoutError at the specified dueTime.

The output stream will complete if:

  • the input stream has completed
  • the specified dueTime has elapsed
  • closeOnError flag is set to true and an error is received
Marble Diagram

Live demo here.

Links

Source Code

API Ref­er­ence

Get­ting Started guide

Wiki (with live demo for each method)

Intro to Rx

--

--

Yan Cui
theburningmonk.com

AWS Serverless Hero. Follow me to learn practical tips and best practices for AWS and Serverless.