Observable Factory

rx.amb(*sources)

Propagates the observable sequence that emits first.

amb

Example

>>> winner = rx.amb(xs, ys, zs)
Parameters

sources (Observable) – Sequence of observables to monitor for first emission.

Return type

Observable

Returns

An observable sequence that surfaces any of the given sequences, whichever emitted the first element.

rx.case(mapper, sources, default_source=None)

Uses mapper to determine which source in sources to use.

case

Examples

>>> res = rx.case(mapper, { '1': obs1, '2': obs2 })
>>> res = rx.case(mapper, { '1': obs1, '2': obs2 }, obs0)
Parameters
  • mapper (Callable[[], Any]) – The function which extracts the value for to test in a case statement.

  • sources (Mapping[~KT, +VT_co]) – An object which has keys which correspond to the case statement labels.

  • default_source (Union[Observable, Future, None]) – [Optional] The observable sequence or Future that will be run if the sources are not matched. If this is not provided, it defaults to empty().

Return type

Observable

Returns

An observable sequence which is determined by a case statement.

rx.catch(*sources)

Continues observable sequences which are terminated with an exception by switching over to the next observable sequence.

catch

Examples

>>> res = rx.catch(xs, ys, zs)
Parameters

sources (Observable) – Sequence of observables.

Return type

Observable

Returns

An observable sequence containing elements from consecutive observables from the sequence of sources until one of them terminates successfully.

rx.catch_with_iterable(sources)

Continues observable sequences that are terminated with an exception by switching over to the next observable sequence.

catch

Examples

>>> res = rx.catch([xs, ys, zs])
>>> res = rx.catch(src for src in [xs, ys, zs])
Parameters

sources (Iterable[Observable]) – An Iterable of observables; thus, a generator can also be used here.

Return type

Observable

Returns

An observable sequence containing elements from consecutive observables from the sequence of sources until one of them terminates successfully.

rx.create(subscribe)
Creates an observable sequence object from the specified

subscription function.

create

Parameters

subscribe (Callable[[Observer[-T_in], Optional[Scheduler]], Disposable]) – Subscription function.

Return type

Observable

Returns

An observable sequence that can be subscribed to via the given subscription function.

rx.combine_latest(*sources)

Merges the specified observable sequences into one observable sequence by creating a tuple whenever any of the observable sequences emits an element.

combine_latest

Examples

>>> obs = rx.combine_latest(obs1, obs2, obs3)
Parameters

sources (Observable) – Sequence of observables.

Return type

Observable

Returns

An observable sequence containing the result of combining elements from each source in given sequence.

rx.concat(*sources)

Concatenates all of the specified observable sequences.

concat

Examples

>>> res = rx.concat(xs, ys, zs)
Parameters

sources (Observable) – Sequence of observables.

Return type

Observable

Returns

An observable sequence that contains the elements of each source in the given sequence, in sequential order.

rx.concat_with_iterable(sources)

Concatenates all of the specified observable sequences.

concat

Examples

>>> res = rx.concat_with_iterable([xs, ys, zs])
>>> res = rx.concat_with_iterable(for src in [xs, ys, zs])
Parameters

sources (Iterable[Observable]) – An Iterable of observables; thus, a generator can also be used here.

Return type

Observable

Returns

An observable sequence that contains the elements of each given sequence, in sequential order.

rx.defer(factory)

Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.

defer

Example

>>> res = rx.defer(lambda: of(1, 2, 3))
Parameters

factory (Callable[[Scheduler], Union[Observable, Future]]) – Observable factory function to invoke for each observer which invokes subscribe() on the resulting sequence.

Return type

Observable

Returns

An observable sequence whose observers trigger an invocation of the given factory function.

rx.empty(scheduler=None)

Returns an empty observable sequence.

empty

Example

>>> obs = rx.empty()
Parameters

scheduler (Optional[Scheduler]) – [Optional] Scheduler instance to send the termination call on. By default, this will use an instance of ImmediateScheduler.

Return type

Observable

Returns

An observable sequence with no elements.

rx.for_in(values, mapper)

Concatenates the observable sequences obtained by running the specified result mapper for each element in the specified values.

for_in

Note

This is just a wrapper for rx.concat(map(mapper, values))

Parameters
  • values (Iterable[Any]) – An Iterable of values to turn into an observable source.

  • mapper (Callable[[~T1], ~T2]) – A function to apply to each item in the values list to turn it into an observable sequence; this should return instances of rx.Observable.

Return type

Observable

Returns

An observable sequence from the concatenated observable sequences.

rx.from_callable(supplier, scheduler=None)

Returns an observable sequence that contains a single element generated by the given supplier, using the specified scheduler to send out observer messages.

from_callable

Examples

>>> res = rx.from_callable(lambda: calculate_value())
>>> res = rx.from_callable(lambda: 1 / 0) # emits an error
Parameters
  • supplier (Callable[[], Any]) – Function which is invoked to obtain the single element.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler instance to schedule the values on. If not specified, the default is to use an instance of CurrentThreadScheduler.

Return type

Observable

Returns

An observable sequence containing the single element obtained by invoking the given supplier function.

rx.from_callback(func, mapper=None)

Converts a callback function to an observable sequence.

Parameters
  • func (Callable) – Function with a callback as the last argument to convert to an Observable sequence.

  • mapper (Optional[Callable[[~T1], ~T2]]) – [Optional] A mapper which takes the arguments from the callback to produce a single item to yield on next.

Return type

Callable[[], Observable]

Returns

A function, when executed with the required arguments minus the callback, produces an Observable sequence with a single value of the arguments to the callback as a list.

rx.from_future(future)

Converts a Future to an Observable sequence

from_future

Parameters

future (Future) – A Python 3 compatible future. https://docs.python.org/3/library/asyncio-task.html#future http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.Future

Return type

Observable

Returns

An observable sequence which wraps the existing future success and failure.

rx.from_iterable(iterable, scheduler=None)

Converts an iterable to an observable sequence.

from_iterable

Example

>>> rx.from_iterable([1,2,3])
Parameters
  • iterable (Iterable[+T_co]) – An Iterable to change into an observable sequence.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler instance to schedule the values on. If not specified, the default is to use an instance of CurrentThreadScheduler.

Return type

Observable

Returns

The observable sequence whose elements are pulled from the given iterable sequence.

rx.from_(iterable, scheduler=None)

Alias for rx.from_iterable().

Return type

Observable

rx.from_list(iterable, scheduler=None)

Alias for rx.from_iterable().

Return type

Observable

rx.from_marbles(string, timespan=0.1, scheduler=None, lookup=None, error=None)

Convert a marble diagram string to a cold observable sequence, using an optional scheduler to enumerate the events.

from_marbles

Each character in the string will advance time by timespan (except for space). Characters that are not special (see the table below) will be interpreted as a value to be emitted. Numbers will be cast to int or float.

Special characters:

-

advance time by timespan

#

on_error()

|

on_completed()

(

open a group of marbles sharing the same timestamp

)

close a group of marbles

,

separate elements in a group

<space>

used to align multiple diagrams, does not advance time

In a group of elements, the position of the initial ( determines the timestamp at which grouped elements will be emitted. E.g. --(12,3,4)-- will emit 12, 3, 4 at 2 * timespan and then advance virtual time by 8 * timespan.

Examples

>>> from_marbles('--1--(2,3)-4--|')
>>> from_marbles('a--b--c-', lookup={'a': 1, 'b': 2, 'c': 3})
>>> from_marbles('a--b---#', error=ValueError('foo'))
Parameters
  • string (str) – String with marble diagram

  • timespan (Union[timedelta, float]) – [Optional] Duration of each character in seconds. If not specified, defaults to 0.1.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler to run the the input sequence on. If not specified, defaults to the subscribe scheduler if defined, else to an instance of NewThreadScheduler <rx.scheduler.NewThreadScheduler.

  • lookup (Optional[Mapping[~KT, +VT_co]]) – [Optional] A dict used to convert an element into a specified value. If not specified, defaults to {}.

  • error (Optional[Exception]) – [Optional] Exception that will be use in place of the # symbol. If not specified, defaults to Exception('error').

Return type

Observable

Returns

The observable sequence whose elements are pulled from the given marble diagram string.

rx.cold(string, timespan=0.1, scheduler=None, lookup=None, error=None)

Alias for rx.from_marbles().

Return type

Observable

rx.generate_with_relative_time(initial_state, condition, iterate, time_mapper)

Generates an observable sequence by iterating a state from an initial state until the condition fails.

generate_with_relative_time

Example

>>> res = rx.generate_with_relative_time(0, lambda x: True, lambda x: x + 1, lambda x: 0.5)
Parameters
  • initial_state (Any) – Initial state.

  • condition (Callable[[~T1], bool]) – Condition to terminate generation (upon returning False).

  • iterate (Callable[[~T1], ~T2]) – Iteration step function.

  • time_mapper (Callable[[Any], Union[timedelta, float]]) – Time mapper function to control the speed of values being produced each iteration, returning relative times, i.e. either a float denoting seconds, or an instance of timedelta.

Return type

Observable

Returns

The generated sequence.

rx.generate(initial_state, condition, iterate)

Generates an observable sequence by running a state-driven loop producing the sequence’s elements.

generate

Example

>>> res = rx.generate(0, lambda x: x < 10, lambda x: x + 1)
Parameters
  • initial_state (Any) – Initial state.

  • condition (Callable[[~T1], bool]) – Condition to terminate generation (upon returning False).

  • iterate (Callable[[~T1], ~T2]) – Iteration step function.

Return type

Observable

Returns

The generated sequence.

rx.hot(string, timespan=0.1, duetime=0.0, scheduler=None, lookup=None, error=None)

Convert a marble diagram string to a hot observable sequence, using an optional scheduler to enumerate the events.

hot

Each character in the string will advance time by timespan (except for space). Characters that are not special (see the table below) will be interpreted as a value to be emitted. Numbers will be cast to int or float.

Special characters:

-

advance time by timespan

#

on_error()

|

on_completed()

(

open a group of elements sharing the same timestamp

)

close a group of elements

,

separate elements in a group

<space>

used to align multiple diagrams, does not advance time

In a group of elements, the position of the initial ( determines the timestamp at which grouped elements will be emitted. E.g. --(12,3,4)-- will emit 12, 3, 4 at 2 * timespan and then advance virtual time by 8 * timespan.

Examples

>>> hot("--1--(2,3)-4--|")
>>> hot("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3})
>>> hot("a--b---#", error=ValueError("foo"))
Parameters
  • string (str) – String with marble diagram

  • timespan (Union[timedelta, float]) – [Optional] Duration of each character in seconds. If not specified, defaults to 0.1.

  • duetime (Union[datetime, timedelta, float]) – [Optional] Absolute datetime or timedelta from now that determines when to start the emission of elements.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler to run the the input sequence on. If not specified, defaults to an instance of NewThreadScheduler.

  • lookup (Optional[Mapping[~KT, +VT_co]]) – [Optional] A dict used to convert an element into a specified value. If not specified, defaults to {}.

  • error (Optional[Exception]) – [Optional] Exception that will be use in place of the # symbol. If not specified, defaults to Exception('error').

Return type

Observable

Returns

The observable sequence whose elements are pulled from the given marble diagram string.

rx.if_then(condition, then_source, else_source=None)

Determines whether an observable collection contains values.

if_then

Examples

>>> res = rx.if_then(condition, obs1)
>>> res = rx.if_then(condition, obs1, obs2)
Parameters
  • condition (Callable[[], bool]) – The condition which determines if the then_source or else_source will be run.

  • then_source (Union[Observable, Future]) – The observable sequence or Future that will be run if the condition function returns True.

  • else_source (Union[Observable, Future, None]) – [Optional] The observable sequence or Future that will be run if the condition function returns False. If this is not provided, it defaults to empty().

Return type

Observable

Returns

An observable sequence which is either the then_source or else_source.

rx.interval(period, scheduler=None)

Returns an observable sequence that produces a value after each period.

interval

Example

>>> res = rx.interval(1.0)
Parameters
  • period (Union[timedelta, float]) – Period for producing the values in the resulting sequence (specified as a float denoting seconds or an instance of timedelta).

  • scheduler (Optional[Scheduler]) – Scheduler to run the interval on. If not specified, an instance of TimeoutScheduler is used.

Return type

Observable

Returns

An observable sequence that produces a value after each period.

rx.merge(*sources)

Merges all the observable sequences into a single observable sequence.

merge

Example

>>> res = rx.merge(obs1, obs2, obs3)
Parameters

sources (Observable) – Sequence of observables.

Return type

Observable

Returns

The observable sequence that merges the elements of the observable sequences.

rx.never()

Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).

never

Return type

Observable

Returns

An observable sequence whose observers will never get called.

rx.of(*args)

This method creates a new observable sequence whose elements are taken from the arguments.

of

Note

This is just a wrapper for rx.from_iterable(args)

Example

>>> res = rx.of(1,2,3)
Parameters

args (Any) – The variable number elements to emit from the observable.

Return type

Observable

Returns

The observable sequence whose elements are pulled from the given arguments

rx.on_error_resume_next(*sources)

Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.

on_error_resume_next

Examples

>>> res = rx.on_error_resume_next(xs, ys, zs)
Parameters

sources (Union[Observable, Future]) – Sequence of sources, each of which is expected to be an instance of either Observable or Future.

Return type

Observable

Returns

An observable sequence that concatenates the source sequences, even if a sequence terminates with an exception.

rx.range(start, stop=None, step=None, scheduler=None)

Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to send out observer messages.

range

Examples

>>> res = rx.range(10)
>>> res = rx.range(0, 10)
>>> res = rx.range(0, 10, 1)
Parameters
  • start (int) – The value of the first integer in the sequence.

  • count – The number of sequential integers to generate.

  • scheduler (Optional[Scheduler]) – [Optional] The scheduler to schedule the values on. If not specified, the default is to use an instance of CurrentThreadScheduler.

Return type

Observable

Returns

An observable sequence that contains a range of sequential integral numbers.

rx.return_value(value, scheduler=None)

Returns an observable sequence that contains a single element, using the specified scheduler to send out observer messages. There is an alias called ‘just’.

return_value

Examples

>>> res = rx.return_value(42)
>>> res = rx.return_value(42, timeout_scheduler)
Parameters

value (Any) – Single element in the resulting observable sequence.

Return type

Observable

Returns

An observable sequence containing the single specified element.

rx.just(value, scheduler=None)

Alias for rx.return_value().

Return type

Observable

rx.repeat_value(value=None, repeat_count=None)

Generates an observable sequence that repeats the given element the specified number of times.

repeat_value

Examples

>>> res = rx.repeat_value(42)
>>> res = rx.repeat_value(42, 4)
Parameters
  • value (Optional[Any]) – Element to repeat.

  • repeat_count (Optional[int]) – [Optional] Number of times to repeat the element. If not specified, repeats indefinitely.

Return type

Observable

Returns

An observable sequence that repeats the given element the specified number of times.

rx.start(func, scheduler=None)

Invokes the specified function asynchronously on the specified scheduler, surfacing the result through an observable sequence.

start

Note

The function is called immediately, not during the subscription of the resulting sequence. Multiple subscriptions to the resulting sequence can observe the function’s result.

Example

>>> res = rx.start(lambda: pprint('hello'))
>>> res = rx.start(lambda: pprint('hello'), rx.Scheduler.timeout)
Parameters
  • func (Callable) – Function to run asynchronously.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler to run the function on. If not specified, defaults to an instance of TimeoutScheduler.

Return type

Observable

Returns

An observable sequence exposing the function’s result value, or an exception.

rx.start_async(function_async)

Invokes the asynchronous function, surfacing the result through an observable sequence.

start_async

Parameters

function_async (Callable[[], Future]) – Asynchronous function which returns a Future to run.

Return type

Observable

Returns

An observable sequence exposing the function’s result value, or an exception.

rx.throw(exception, scheduler=None)

Returns an observable sequence that terminates with an exception, using the specified scheduler to send out the single OnError message.

throw

Example

>>> res = rx.throw(Exception('Error'))
Parameters
  • exception (Exception) – An object used for the sequence’s termination.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler to schedule the error notification on. If not specified, the default is to use an instance of ImmediateScheduler.

Return type

Observable

Returns

The observable sequence that terminates exceptionally with the specified exception object.

rx.timer(duetime, period=None, scheduler=None)

Returns an observable sequence that produces a value after duetime has elapsed and then after each period.

timer

Examples

>>> res = rx.timer(datetime(...))
>>> res = rx.timer(datetime(...), 0.1)
>>> res = rx.timer(5.0)
>>> res = rx.timer(5.0, 1.0)
Parameters
  • duetime (Union[datetime, timedelta, float]) – Absolute (specified as a datetime object) or relative time (specified as a float denoting seconds or an instance of timedelta) at which to produce the first value.

  • period (Union[timedelta, float, None]) – [Optional] Period to produce subsequent values (specified as a float denoting seconds or an instance of timedelta). If not specified, the resulting timer is not recurring.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler to run the timer on. If not specified, the default is to use an instance of TimeoutScheduler.

Return type

Observable

Returns

An observable sequence that produces a value after due time has elapsed and then each period.

rx.to_async(func, scheduler=None)

Converts the function into an asynchronous function. Each invocation of the resulting asynchronous function causes an invocation of the original synchronous function on the specified scheduler.

to_async

Examples

>>> res = rx.to_async(lambda x, y: x + y)(4, 3)
>>> res = rx.to_async(lambda x, y: x + y, Scheduler.timeout)(4, 3)
>>> res = rx.to_async(lambda x: log.debug(x), Scheduler.timeout)('hello')
Parameters
  • func (Callable) – Function to convert to an asynchronous function.

  • scheduler (Optional[Scheduler]) – [Optional] Scheduler to run the function on. If not specified, defaults to an instance of TimeoutScheduler.

Return type

Callable

Returns

Asynchronous function.

rx.using(resource_factory, observable_factory)

Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence’s lifetime.

Example

>>> res = rx.using(lambda: AsyncSubject(), lambda: s: s)
Parameters
  • resource_factory (Callable[[], Disposable]) – Factory function to obtain a resource object.

  • observable_factory (Callable[[Disposable], Observable]) – Factory function to obtain an observable sequence that depends on the obtained resource.

Return type

Observable

Returns

An observable sequence whose lifetime controls the lifetime of the dependent resource object.

rx.with_latest_from(*sources)

Merges the specified observable sequences into one observable sequence by creating a tuple only when the first observable sequence produces an element.

with_latest_from

Examples

>>> obs = rx.with_latest_from(obs1)
>>> obs = rx.with_latest_from([obs1, obs2, obs3])
Parameters

sources (Observable) – Sequence of observables.

Return type

Observable

Returns

An observable sequence containing the result of combining elements of the sources into a tuple.

rx.zip(*args)

Merges the specified observable sequences into one observable sequence by creating a tuple whenever all of the observable sequences have produced an element at a corresponding index.

zip

Example

>>> res = rx.zip(obs1, obs2)
Parameters

args (Observable) – Observable sources to zip.

Return type

Observable

Returns

An observable sequence containing the result of combining elements of the sources as a tuple.