RxPY - 转换运算符

buffer

此运算符将从源可观察对象中收集所有值,并在满足给定边界条件后定期发出它们。

语法

buffer(boundaries)

参数

boundaries: 输入是可观察的,它将决定何时停止以便发出收集的值。

返回值

返回值是 observable 的,所有的值都是从源 observable 收集的,持续时间由输入的 observable 决定。

示例

from rx import of, interval, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.buffer(interval(1.0))
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

输出

E:\pyrx>python test1.py
The elements are [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

ground_by

此运算符将根据给定的 key_mapper 函数对来自源可观察对象的值进行分组。

语法

group_by(key_mapper)

参数

key_mapper: 此函数将负责从源可观察对象中提取密钥。

返回值

它返回一个可观察值,其值根据 key_mapper 函数分组。

示例

from rx import from_, interval, operators as op
test = from_(["A", "B", "C", "D"])
sub1 = test.pipe(
   op.group_by(lambda v: v[0])
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

输出

E:\pyrx>python testrx.py
The element is <rx.core.observable.groupedobservable.GroupedObservable object
at
 0x000000C99A2E6550>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
 0x000000C99A2E65C0>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
 0x000000C99A2E6588>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
 0x000000C99A2E6550>

map

此运算符将根据给定的 mapper_func 的输出将源可观察对象中的每个值更改为新值。

语法

map(mapper_func:None)

参数

mapper_func: (可选)它将根据来自此函数的输出更改源可观察值的值。

示例

from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.map(lambda x :x*x)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

输出

E:\pyrx>python testrx.py
The element is 1
The element is 4
The element is 9
The element is 16
The element is 25
The element is 36
The element is 49
The element is 64
The element is 81
The element is 100

scan

此运算符将对来自源 observable 的值应用累加器函数,并返回具有新值的 observable。

语法

scan(accumulator_func, seed=NotSet)

参数

accumulator_func: 此函数应用于源可观察对象的所有值。

seed:(可选)要在 accumular_func 中使用的初始值。

返回值

此运算符将返回一个具有新值的可观察对象,该新值基于应用于源可观察对象的每个值的累加器函数。

示例

from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.scan(lambda acc, a: acc + a, 0))
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

输出

E:\pyrx>python testrx.py
The element is 1
The element is 3
The element is 6
The element is 10
The element is 15
The element is 21
The element is 28
The element is 36
The element is 45
The element is 55