Propagates the observable sequence that emits first.
Example
>>> winner = reactivex.amb(xs, ys, zs)
sources (Observable
[TypeVar
(_T
)]) – Sequence of observables to monitor for first emission.
Observable
[TypeVar
(_T
)]
An observable sequence that surfaces any of the given sequences, whichever emitted the first element.
Uses mapper to determine which source in sources to use.
Examples
>>> res = reactivex.case(mapper, { '1': obs1, '2': obs2 })
>>> res = reactivex.case(mapper, { '1': obs1, '2': obs2 }, obs0)
mapper – The function which extracts the value for to test in a case statement.
sources – An object which has keys which correspond to the case statement labels.
default_source – [Optional] The observable sequence or Future that will
be run if the sources are not matched. If this is not provided,
it defaults to empty()
.
An observable sequence which is determined by a case statement.
Continues observable sequences which are terminated with an exception by switching over to the next observable sequence.
Examples
>>> res = reactivex.catch(xs, ys, zs)
sources (Observable
[TypeVar
(_T
)]) – Sequence of observables.
Observable
[TypeVar
(_T
)]
An observable sequence containing elements from consecutive observables from the sequence of sources until one of them terminates successfully.
Continues observable sequences that are terminated with an exception by switching over to the next observable sequence.
Examples
>>> res = reactivex.catch([xs, ys, zs])
>>> res = reactivex.catch(src for src in [xs, ys, zs])
sources (Iterable
[Observable
[TypeVar
(_T
)]]) – An Iterable of observables; thus, a generator can also
be used here.
Observable
[TypeVar
(_T
)]
An observable sequence containing elements from consecutive observables from the sequence of sources until one of them terminates successfully.
subscription function.
subscribe (Callable
[[ObserverBase
[TypeVar
(_T
)], Optional
[SchedulerBase
]], DisposableBase
]) – Subscription function.
Observable
[TypeVar
(_T
)]
An observable sequence that can be subscribed to via the given subscription function.
Merges the specified observable sequences into one observable sequence by creating a tuple whenever any of the observable sequences emits an element.
Examples
>>> obs = rx.combine_latest(obs1, obs2, obs3)
sources – Sequence of observables.
Observable
[Any
]
An observable sequence containing the result of combining elements from each source in given sequence.
Compose multiple operators left to right.
Composes zero or more operators into a functional composition. The operators are composed to left to right. A composition of zero operators gives back the source.
Examples
>>> pipe()(source) == source
>>> pipe(f)(source) == f(source)
>>> pipe(f, g)(source) == g(f(source))
>>> pipe(f, g, h)(source) == h(g(f(source)))
...
Callable
[[Any
], Any
]
The composed observable.
Concatenates all of the specified observable sequences.
Examples
>>> res = reactivex.concat(xs, ys, zs)
sources (Observable
[TypeVar
(_T
)]) – Sequence of observables.
Observable
[TypeVar
(_T
)]
An observable sequence that contains the elements of each source in the given sequence, in sequential order.
Concatenates all of the specified observable sequences.
Examples
>>> res = reactivex.concat_with_iterable([xs, ys, zs])
>>> res = reactivex.concat_with_iterable(for src in [xs, ys, zs])
sources (Iterable
[Observable
[TypeVar
(_T
)]]) – An Iterable of observables; thus, a generator can also
be used here.
Observable
[TypeVar
(_T
)]
An observable sequence that contains the elements of each given sequence, in sequential order.
Represents an observable that can be connected and disconnected.
Creates an observable sequence object from the specified subscription function.
subscribe – [Optional] Subscription function
Connects the observable.
Optional
[DisposableBase
]
Returns an observable sequence that stays connected to the source indefinitely to the observable sequence. Providing a subscriber_count will cause it to connect() after that many subscriptions occur. A subscriber_count of 0 will result in emissions firing immediately without waiting for subscribers.
Observable
[TypeVar
(_T
)]
Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
Example
>>> res = reactivex.defer(lambda scheduler: of(1, 2, 3))
factory – Observable factory function to invoke for each observer
which invokes subscribe()
on the resulting sequence.
The factory takes a single argument, the scheduler used.
An observable sequence whose observers trigger an invocation of the given factory function.
Returns an empty observable sequence.
Example
>>> obs = reactivex.empty()
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler instance to send the termination call
on. By default, this will use an instance of
ImmediateScheduler
.
Observable
[Any
]
An observable sequence with no elements.
Wait for observables to complete and then combine last values they emitted into a tuple. Whenever any of that observables completes without emitting any value, result sequence will complete at that moment as well.
Examples
>>> obs = reactivex.fork_join(obs1, obs2, obs3)
sources (Observable
[Any
]) – Sequence of observables.
Observable
[Any
]
An observable sequence containing the result of combining last element from each source in given sequence.
Returns an observable sequence that contains a single element generated by the given supplier, using the specified scheduler to send out observer messages.
Examples
>>> res = reactivex.from_callable(lambda: calculate_value())
>>> res = reactivex.from_callable(lambda: 1 / 0) # emits an error
supplier (Callable
[[], TypeVar
(_T
)]) – Function which is invoked to obtain the single element.
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler instance to schedule the values on.
If not specified, the default is to use an instance of
CurrentThreadScheduler
.
Observable
[TypeVar
(_T
)]
An observable sequence containing the single element obtained by invoking the given supplier function.
Converts a callback function to an observable sequence.
func (Callable
[...
, Callable
[...
, None
]]) – Function with a callback as the last argument to
convert to an Observable sequence.
mapper (Optional
[Callable
[[Any
], Any
]]) – [Optional] A mapper which takes the arguments
from the callback to produce a single item to yield on
next.
Callable
[[], Observable
[Any
]]
A function, when executed with the required arguments minus the callback, produces an Observable sequence with a single value of the arguments to the callback as a list.
Converts a Future to an Observable sequence
future – A Python 3 compatible future. https://docs.python.org/3/library/asyncio-task.html#future
An observable sequence which wraps the existing future success and failure.
Converts an iterable to an observable sequence.
Example
>>> reactivex.from_iterable([1,2,3])
iterable (Iterable
[TypeVar
(_T
)]) – An Iterable to change into an observable sequence.
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler instance to schedule the values on.
If not specified, the default is to use an instance of
CurrentThreadScheduler
.
Observable
[TypeVar
(_T
)]
The observable sequence whose elements are pulled from the given iterable sequence.
Creates an observable sequence object from the specified subscription function.
subscribe – [Optional] Subscription function
Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).
Observable
[Any
]
An observable sequence whose observers will never get called.
Represents a notification to an observer.
Default constructor used by derived types.
Invokes the delegate corresponding to the notification or an observer and returns the produced result.
Examples
>>> notification.accept(observer)
>>> notification.accept(on_next, on_error, on_completed)
on_next (Union
[Callable
[[TypeVar
(_T
)], None
], ObserverBase
[TypeVar
(_T
)]]) – Delegate to invoke for an OnNext notification.
on_error (Optional
[Callable
[[Exception
], None
]]) – [Optional] Delegate to invoke for an OnError
notification.
on_completed (Optional
[Callable
[[], None
]]) – [Optional] Delegate to invoke for an
OnCompleted notification.
None
Result produced by the observation.
Returns an observable sequence with a single notification, using the specified scheduler, else the immediate scheduler.
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler to send out the
notification calls on.
ObservableBase
[TypeVar
(_T
)]
An observable sequence that surfaces the behavior of the notification upon subscription.
Indicates whether this instance and a specified object are equal.
bool
Return self==value.
bool
Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
Examples
>>> res = reactivex.on_error_resume_next(xs, ys, zs)
sources – Sequence of sources, each of which is expected to be an
instance of either Observable
or Future
.
An observable sequence that concatenates the source sequences, even if a sequence terminates with an exception.
This method creates a new observable sequence whose elements are taken from the arguments.
Note
This is just a wrapper for
reactivex.from_iterable(args)
Example
>>> res = reactivex.of(1,2,3)
args (TypeVar
(_T
)) – The variable number elements to emit from the observable.
Observable
[TypeVar
(_T
)]
The observable sequence whose elements are pulled from the given arguments
Observable base class.
Represents a push-style collection, which you can pipe
into
operators
.
Creates an observable sequence object from the specified subscription function.
subscribe (Optional
[Callable
[[ObserverBase
[TypeVar
(_T_out
, covariant=True)], Optional
[SchedulerBase
]], DisposableBase
]]) – [Optional] Subscription function
Subscribe an observer to the observable sequence.
You may subscribe using an observer or callbacks, not both; if the first
argument is an instance of Observer
or if
it has a (callable) attribute named on_next
, then any callback
arguments will be ignored.
Examples
>>> source.subscribe()
>>> source.subscribe(observer)
>>> source.subscribe(observer, scheduler=scheduler)
>>> source.subscribe(on_next)
>>> source.subscribe(on_next, on_error)
>>> source.subscribe(on_next, on_error, on_completed)
>>> source.subscribe(on_next, on_error, on_completed, scheduler=scheduler)
observer – [Optional] The object that is to receive notifications.
on_error (Optional
[Callable
[[Exception
], None
]]) – [Optional] Action to invoke upon exceptional termination
of the observable sequence.
on_completed (Optional
[Callable
[[], None
]]) – [Optional] Action to invoke upon graceful termination
of the observable sequence.
on_next (Union
[ObserverBase
[TypeVar
(_T_out
, covariant=True)], Callable
[[TypeVar
(_T_out
, covariant=True)], None
], None
]) – [Optional] Action to invoke for each element in the
observable sequence.
scheduler (Optional
[SchedulerBase
]) – [Optional] The default scheduler to use for this
subscription.
DisposableBase
Disposable object representing an observer’s subscription to the observable sequence.
Compose multiple operators left to right.
Composes zero or more operators into a functional composition. The operators are composed from left to right. A composition of zero operators gives back the original source.
Examples
>>> source.pipe() == source
>>> source.pipe(f) == f(source)
>>> source.pipe(g, f) == f(g(source))
>>> source.pipe(h, g, f) == f(g(h(source)))
operators (Callable
[[Any
], Any
]) – Sequence of operators.
Any
The composed observable.
Run source synchronously.
Subscribes to the observable source. Then blocks and waits for the observable source to either complete or error. Returns the last value emitted, or throws exception if any error occurred.
Examples
>>> result = run(source)
SequenceContainsNoElementsError – if observable completes (on_completed) without any values being emitted.
Exception – raises exception if any error (on_error) occurred.
Any
The last element emitted from the observable.
Awaits the given observable.
Generator
[Any
, None
, TypeVar
(_T_out
, covariant=True)]
The last item of the observable sequence.
Pythonic version of concat
.
Example
>>> zs = xs + ys
other (Observable
[TypeVar
(_T_out
, covariant=True)]) – The second observable sequence in the concatenation.
Observable
[TypeVar
(_T_out
, covariant=True)]
Concatenated observable sequence.
Pythonic use of concat
.
Example
>>> xs += ys
other (Observable
[TypeVar
(_T_out
, covariant=True)]) – The second observable sequence in the concatenation.
Observable[_T_out]
Concatenated observable sequence.
Pythonic version of slice
.
Slices the given observable using Python slice notation. The arguments to slice are start, stop and step given within brackets [] and separated by the colons :.
It is basically a wrapper around the operators
skip
,
skip_last
,
take
,
take_last
and
filter
.
The following diagram helps you remember how slices works with streams. Positive numbers are relative to the start of the events, while negative numbers are relative to the end (close) of the stream.
r---e---a---c---t---i---v---e---!
0 1 2 3 4 5 6 7 8
-8 -7 -6 -5 -4 -3 -2 -1 0
Examples
>>> result = source[1:10]
>>> result = source[1:-2]
>>> result = source[1:-1:2]
key (Union
[slice
, int
]) – Slice object
Observable
[TypeVar
(_T_out
, covariant=True)]
Sliced observable sequence.
TypeError – If key is not of type int
or slice
Base class for implementations of the Observer class. This base class enforces the grammar of observers where OnError and OnCompleted are terminal messages.
Notify the observer of a new element in the sequence.
None
Notify the observer that an exception has occurred.
error (Exception
) – The error that occurred.
None
Notifies the observer of the end of the sequence.
None
Disposes the observer, causing it to transition to the stopped state.
None
Creates a notification callback from an observer.
Returns the action that forwards its input notification to the underlying observer.
Callable
[[Notification
[TypeVar
(_T_in
, contravariant=True)]], None
]
Hides the identity of an observer.
Returns an observer that hides the identity of the specified observer.
ObserverBase
[TypeVar
(_T_in
, contravariant=True)]
Returns an observable sequence that contains a single element, using the specified scheduler to send out observer messages. There is an alias called ‘just’.
Examples
>>> res = reactivex.return_value(42)
>>> res = reactivex.return_value(42, timeout_scheduler)
value (TypeVar
(_T
)) – Single element in the resulting observable sequence.
Observable
[TypeVar
(_T
)]
An observable sequence containing the single specified element.
Functional pipe (|>)
Allows the use of function argument on the left side of the function.
Any
Example
>>> pipe(x, fn) == __fn(x) # Same as x |> fn
>>> pipe(x, fn, gn) == gn(fn(x)) # Same as x |> fn |> gn
...
Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to send out observer messages.
Examples
>>> res = reactivex.range(10)
>>> res = reactivex.range(0, 10)
>>> res = reactivex.range(0, 10, 1)
start (int
) – The value of the first integer in the sequence.
stop (Optional
[int
]) – [Optional] Generate number up to (exclusive) the stop
value. Default is sys.maxsize.
step (Optional
[int
]) – [Optional] The step to be used (default is 1).
scheduler (Optional
[SchedulerBase
]) – [Optional] The scheduler to schedule the values on.
If not specified, the default is to use an instance of
CurrentThreadScheduler
.
Observable
[int
]
An observable sequence that contains a range of sequential integral numbers.
Generates an observable sequence that repeats the given element the specified number of times.
Examples
>>> res = reactivex.repeat_value(42)
>>> res = reactivex.repeat_value(42, 4)
value (TypeVar
(_T
)) – Element to repeat.
repeat_count (Optional
[int
]) – [Optional] Number of times to repeat the element.
If not specified, repeats indefinitely.
Observable
[TypeVar
(_T
)]
An observable sequence that repeats the given element the specified number of times.
Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
Creates an observable sequence object from the specified subscription function.
subscribe – [Optional] Subscription function
Notifies all subscribed observers with the value.
value (TypeVar
(_T
)) – The value to send to all subscribed observers.
None
Notifies all subscribed observers with the exception.
error (Exception
) – The exception to send to all subscribed observers.
None
Notifies all subscribed observers of the end of the sequence.
None
Unsubscribe all observers and release resources.
None
Invokes the specified function asynchronously on the specified scheduler, surfacing the result through an observable sequence.
Note
The function is called immediately, not during the subscription of the resulting sequence. Multiple subscriptions to the resulting sequence can observe the function’s result.
Example
>>> res = reactivex.start(lambda: pprint('hello'))
>>> res = reactivex.start(lambda: pprint('hello'), rx.Scheduler.timeout)
func (Callable
[[], TypeVar
(_T
)]) – Function to run asynchronously.
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler to run the function on. If
not specified, defaults to an instance of
TimeoutScheduler
.
Observable
[TypeVar
(_T
)]
An observable sequence exposing the function’s result value, or an exception.
Invokes the asynchronous function, surfacing the result through an observable sequence.
function_async – Asynchronous function which returns a Future
to run.
An observable sequence exposing the function’s result value, or an exception.
Returns an observable sequence that terminates with an exception, using the specified scheduler to send out the single OnError message.
Example
>>> res = reactivex.throw(Exception('Error'))
exception (Union
[str
, Exception
]) – An object used for the sequence’s termination.
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler to schedule the error notification on.
If not specified, the default is to use an instance of
ImmediateScheduler
.
Observable
[Any
]
The observable sequence that terminates exceptionally with the specified exception object.
Returns an observable sequence that produces a value after duetime has elapsed and then after each period.
Examples
>>> res = reactivex.timer(datetime(...))
>>> res = reactivex.timer(datetime(...), 0.1)
>>> res = reactivex.timer(5.0)
>>> res = reactivex.timer(5.0, 1.0)
duetime (Union
[datetime
, timedelta
, float
]) – Absolute (specified as a datetime object) or relative time
(specified as a float denoting seconds or an instance of timedelta)
at which to produce the first value.
period (Union
[timedelta
, float
, None
]) – [Optional] Period to produce subsequent values (specified as a
float denoting seconds or an instance of timedelta).
If not specified, the resulting timer is not recurring.
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler to run the timer on. If not specified,
the default is to use an instance of
TimeoutScheduler
.
Observable
[int
]
An observable sequence that produces a value after due time has elapsed and then each period.
Converts the function into an asynchronous function. Each invocation of the resulting asynchronous function causes an invocation of the original synchronous function on the specified scheduler.
Examples
>>> res = reactivex.to_async(lambda x, y: x + y)(4, 3)
>>> res = reactivex.to_async(lambda x, y: x + y, Scheduler.timeout)(4, 3)
>>> res = reactivex.to_async(lambda x: log.debug(x), Scheduler.timeout)('hello')
func (Callable
[...
, TypeVar
(_T
)]) – Function to convert to an asynchronous function.
scheduler (Optional
[SchedulerBase
]) – [Optional] Scheduler to run the function on. If not
specified, defaults to an instance of
TimeoutScheduler
.
Callable
[...
, Observable
[TypeVar
(_T
)]]
Asynchronous function.
Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence’s lifetime.
Example
>>> res = reactivex.using(lambda: AsyncSubject(), lambda: s: s)
resource_factory (Callable
[[], DisposableBase
]) – Factory function to obtain a resource object.
observable_factory (Callable
[[DisposableBase
], Observable
[TypeVar
(_T
)]]) – Factory function to obtain an observable
sequence that depends on the obtained resource.
Observable
[TypeVar
(_T
)]
An observable sequence whose lifetime controls the lifetime of the dependent resource object.
Merges the specified observable sequences into one observable
sequence by creating a tuple
only when the first
observable sequence produces an element.
Examples
>>> obs = rx.with_latest_from(obs1)
>>> obs = rx.with_latest_from([obs1, obs2, obs3])
sources (Observable
[Any
]) – Sequence of observables.
Observable
[Tuple
[Any
, ...
]]
An observable sequence containing the result of combining
elements of the sources into a tuple
.
Merges the specified observable sequences into one observable
sequence by creating a tuple
whenever all of the
observable sequences have produced an element at a corresponding
index.
Example
>>> res = rx.zip(obs1, obs2)
args (Observable
[Any
]) – Observable sources to zip.
Observable
[Tuple
[Any
, ...
]]
An observable sequence containing the result of combining
elements of the sources as a tuple
.