RxPY - 使用主题

主题是一个可观察的序列,也是一个可以多播的观察者,即与许多已订阅的观察者交谈。

我们将讨论以下主题 −

  • 创建主题
  • 订阅主题
  • 将数据传递给主题
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

创建主题

要使用主题,我们需要导入主题,如下所示 −

from rx.subject import Subject

可以如下创建主客体 −

subject_test = Subject()

该对象是一个具有三个方法的观察者 −

  • on_next(value)
  • on_error(error) and
  • on_completed()

订阅主题

您可以在主题上创建多个订阅,如下所示 −

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

向主题传递数据

您可以将数据传递给使用 on_next(value) 方法创建的主题,如下所示 −

subject_test.on_next("A")
subject_test.on_next("B")

数据将传递给所有订阅,添加到主题上。

这是该主题的一个工作示例。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test 对象是通过调用 Subject() 创建的。 subject_test 对象引用了 on_next(value)、on_error(error) 和 on_completed() 方法。 上面示例的输出如下所示 −

输出

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我们可以使用 on_completed() 方法来停止主题执行,如下所示。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我们调用完成,后面调用的下一个方法就不会被调用。

输出

E:\pyrx>python testrx.py
The value is A
The value is A

现在让我们看看如何调用 on_error(error) 方法。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

输出

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject 将在调用时为您提供最新值。 您可以创建行为主题,如下所示 −

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

这是一个使用 Behavior Subject 的工作示例

示例

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

输出

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

重播主题

replaysubject 类似于behavior subject,其中,它可以缓冲值并将其重播给新订阅者。 这里是重放主题的一个工作示例。

示例

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

重播主题使用的缓冲区值为 2。 因此,最后两个值将被缓冲并用于调用的新订阅者。

输出

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

异步主题

在AsyncSubject 的情况下,调用的最后一个值被传递给订阅者,并且只有在调用complete() 方法之后才会完成。

示例

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

输出

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2