ReactiveX for Python v4 is an evolution of RxPY v3 to modernize it to current Python standards:
Project main module renamed from rx
to reactivex
. This is done
to give it a unique name different from the obsolete Reactive Extensions
(RxPY)
Generic type annotations. Code now type checks with pyright / pylance at strict settings. It also mostly type checks with mypy. Mypy should eventually catch up.
The pipe
function has been renamed to compose
. There is now a
new function pipe
that works similar to the pipe
method.
RxPY is now a modern Python project using pyproject.toml
instead
of setup.py
, and using modern tools such as Poetry, Black
formatter and isort.
import reactivex as rx
from reactivex import operators as ops
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5)
).subscribe(lambda value: print("Received {0}".format(value)))
RxPY v3 is a major evolution from RxPY v1. This release brings many improvements, some of the most important ones being:
A better integration in IDEs via autocompletion support.
New operators can be implemented outside of RxPY.
Operator chains are now built via the pipe
operator.
A default scheduler can be provided in an operator chain.
The most fundamental change is the way operators are chained together. On RxPY v1, operators were methods of the Observable class. So they were chained by using the existing Observable methods:
from rx import Observable
Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") \
.map(lambda s: len(s)) \
.filter(lambda i: i >= 5) \
.subscribe(lambda value: print("Received {0}".format(value)))
Chaining in RxPY v3 is based on the pipe
operator.
This operator is now one of the only methods of the
Observable
class. In RxPY v3, operators are implemented
as functions:
import rx
from rx import operators as ops
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5)
).subscribe(lambda value: print("Received {0}".format(value)))
The fact that operators are functions means that adding new operators is now very easy. Instead of wrapping custom operators with the let operator, they can be directly used in a pipe chain.
The mapper function is removed in operators that combine the values of several
observables. This change applies to the following operators:
combine_latest
,
group_join
,
join
,
with_latest_from
,
zip
, and
zip_with_iterable
.
In RxPY v1, these operators were used the following way:
from rx import Observable
import operator
a = Observable.of(1, 2, 3, 4)
b = Observable.of(2, 2, 4, 4)
a.zip(b, lambda a, b: operator.mul(a, b)) \
.subscribe(print)
Now they return an Observable of tuples, with each item being the combination of the source Observables:
import rx
from rx import operators as ops
import operator
a = rx.of(1, 2, 3, 4)
b = rx.of(2, 2, 4, 4)
a.pipe(
ops.zip(b), # returns a tuple with the items of a and b
ops.map(lambda z: operator.mul(z[0], z[1]))
).subscribe(print)
Dealing with the tuple unpacking is made easier with the starmap operator that unpacks the tuple to args:
import rx
from rx import operators as ops
import operator
a = rx.of(1, 2, 3, 4)
b = rx.of(2, 2, 4, 4)
a.pipe(
ops.zip(b),
ops.starmap(operator.mul)
).subscribe(print)
The subscription function provided to the create
operator
now takes two parameters: An observer and a scheduler. The scheduler parameter
is new: If a scheduler has been set in the call to subscribe, then this
scheduler is passed to the subscription function. Otherwise this parameter is
set to None.
One can use or ignore this parameter. This new scheduler parameter allows the create operator to use the default scheduler provided in the subscribe call. So scheduling item emissions with relative or absolute due-time is now possible.
The support of list of Observables as a parameter has been removed in the
following operators:
merge
,
zip
, and
combine_latest
.
For example in RxPY v1 the merge operator could be called with a list:
from rx import Observable
obs1 = Observable.from_([1, 2, 3, 4])
obs2 = Observable.from_([5, 6, 7, 8])
res = Observable.merge([obs1, obs2])
res.subscribe(print)
This is not possible anymore in RxPY v3. So Observables must be provided explicitly:
import rx, operator as op
obs1 = rx.from_([1, 2, 3, 4])
obs2 = rx.from_([5, 6, 7, 8])
res = rx.merge(obs1, obs2)
res.subscribe(print)
If for any reason the Observables are only available as a list, then they can be unpacked:
import rx
from rx import operators as ops
obs1 = rx.from_([1, 2, 3, 4])
obs2 = rx.from_([5, 6, 7, 8])
obs_list = [obs1, obs2]
res = rx.merge(*obs_list)
res.subscribe(print)
BlockingObservables have been removed from rxPY v3. In RxPY v1, blocking until an Observable completes was done the following way:
from rx import Observable
res = Observable.from_([1, 2, 3, 4]).to_blocking().last()
print(res)
This is now done with the run
operator:
import rx
res = rx.from_([1, 2, 3, 4]).run()
print(res)
The run operator returns only the last value emitted by the source Observable. It is possible to use the previous blocking operators by using the standard operators before run. For example:
Get first item: obs.pipe(ops.first()).run()
Get all items: obs.pipe(ops.to_list()).run()
Support for back-pressure - and so ControllableObservable - has been removed in RxPY v3. Back-pressure can be implemented in several ways, and many strategies can be adopted. So we consider that such features are beyond the scope of RxPY. You are encouraged to provide independent implementations as separate packages so that they can be shared by the community.
List of community projects supporting backpressure can be found in Additional Reading.
Operators that take time values as parameters now use seconds as a unit instead of milliseconds. This RxPY v1 example:
ops.debounce(500)
is now written as:
ops.debounce(0.5)
Some packages were renamed:
Old name |
New name |
rx.concurrency |
reactivex.scheduler |
rx.disposables |
rx.disposable |
rx.subjects |
rx.subject |
Furthermore, the package formerly known as rx.concurrency.mainloopscheduler has been split into two parts, reactivex.scheduler.mainloop and reactivex.scheduler.eventloop.