RxPY - 使用 Observables

Observable 是一个函数,它创建一个观察者并将其附加到期望值的来源,例如来自 dom 元素的点击、鼠标事件等。

本章将详细研究下面提到的主题。

  • 创建可观察对象

  • 订阅并执行一个 Observable


创建可观察对象

要创建一个可观察对象,我们将使用 create() 方法并将函数传递给它,它具有以下项目。

  • on_next() − 当 Observable 发出一个项目时调用此函数。

  • on_completed() − 当 Observable 完成时调用此函数。

  • on_error() − 当 Observable 发生错误时调用此函数。

要使用 create() 方法,首先导入如下所示的方法 −

from rx import create

这是一个工作示例,用于创建一个可观察对象 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

订阅并执行一个 Observable

要订阅一个可观察对象,我们需要使用 subscribe() 函数并传递回调函数 on_next、on_error 和 on_completed。

这是一个工作示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法负责执行可观察对象。 回调函数 on_nexton_erroron_completed 必须传递给订阅方法。 调用 subscribe 方法,依次执行 test_observable() 函数。

并非必须将所有三个回调函数都传递给 subscribe() 方法。 您可以根据您的要求传递 on_next()、on_error() 和 on_completed()。

lambda 函数用于 on_next、on_error 和 on_completed。 它将接受参数并执行给定的表达式。

这是创建的可观察对象的输出 −

E:\pyrx>python testrx.py
Got - Hello
Job Done!