Watchers - 重新发明 EventBus

Frank Xu
微信读书
Published in
8 min readSep 26, 2016

基于消息传递的事件

有别于回调(Callback),观察者模型常用来处理一些全局性的通知。Android 下常见的实现就是几个 EventBusGuava EventBusottoEventBus by GreenRobot

他们的使用方法非常相似:

  1. 定义消息(一个 Event 对象)
  2. 定义回调(通过 annotation 或者实现 onEvent 函数)
  3. 触发消息(把定义的消息构造好传给 Bus,Bus 负责分发)
// Define the Event
public class AnswerAvailableEvent {
public final long lastAnswer;
public AnswerAvailableEvent(long answer) {
this.lastAnswer = answer;
}
}
// Prepare a bus
Bus bus = new Bus();
bus.post(new AnswerAvailableEvent(42));
// Subscribe to an event
@Subscribe
public void answerAvailable(AnswerAvailableEvent event) {
// TODO: React to the event somehow!
}
bus.register(this);// Publish an event
@Produce
public AnswerAvailableEvent produceAnswer() {
// Assuming 'lastAnswer' exists.
return new AnswerAvailableEvent(this.lastAnswer);
}

观察者模型可以很好地做松散解耦,使得订阅方(Subscriber)和发布方(Publisher)只通过一个协议约束来沟通。在以上的 EventBus 下,这个协议约束就是消息体(Event 对象)。

一般情况下这样的用法其实没有太大问题,不过随着代码越来越大,EventBus 这种通过消息传递来实现观察者模型的弊端也逐步暴露出来

我们发现在日常开发过程中,以下几个问题特别常见:

消息载体经常会被修改

需要修改 Event 对象的时候,我们需要这样做:

  1. 修改 Event 对象本身,比如增加一个字段以及其 Getter/Setter
  2. 查找这个 Event 对象的所有引用
  3. 对象构造的地方 set 入特定的值(或者修改构造器传入需要的值)
  4. 在用到对象的地方 get 出来对应的值

而删除的时候也类似,查找一个对象的所有引用,做对应的重构工作。这样的事情在协议修改阶段可能非常频繁。

这样的方式使用起来也不是特别直观,发布者无非就是想丢多一个字段,订阅者无非就是想多拿一点数据,因为协议是在 Event 上约束的,所以只能通过改消息体及消息体的使用者来达到。

发布方和订阅方经常需要被定位和修改

对于以上几类框架,定位特定一个 Event 的订阅方还是从 Event 对象的引用查找开始,再搜索找到对应的 Annotation 标示的方法进行修改。

这样的定位过程相当原始,几乎等同于全文搜索。而在清理无用代码时也非常容易遗漏一些标识了 Annotation 但是却从来没有使用的方法。

需要更好的管理消息传递的周期,频率,队列控制

既然是 EventBus ,那么只能是管理一条 Bus,这个 Bus 的配置决定了所有经历过这个 Bus 的消息是如何传递的。如果要多种类型的传递方式,那么就构造几条 Bus,这看起来还是挺合理的。

但是从业务功能的角度出发,我们往往会有非常“丰富”的事件类型配置,比如回调线程是在主线程还是子线程,需要针对是 UI 业务还是非 UI 业务来区分,如果因此构造两条 Bus,发布方就需要知道订阅方的使用过程(以决定选择哪一条 Bus 来 Publish),依赖就反过来了。

基于接口定义和实现的事件

有些人提出了 RxJava 作为替代,但是实际上 RxJava 本身如果要实现一个业务无关的解耦,那么还是需要管理一个 Subject,慢慢业务发展下去这个 Subject 就变成了一个 Bus。

我们可以发现,其实观察者模型本身没有什么问题,Bus 本身可以通过修改满足更丰富的线程、频率、队列控制。但最关键的问题在于,用什么方式定义发布方和订阅方的协议才能易于修改。

语言层面的协议定义,最好的方式就是接口(Interface),那么我们怎么利用接口来做发布方和订阅方的协议?答案是:Java Proxy

几点优势

  1. 可直接通过 IDE 的方法实现和调用定位,一目了然
  2. Bus 中传递的消息其实就是方法的参数,用重构参数的方式进行修改
// Define an Event
public interface PushWatcher extents Watchers.Watcher {
void notify(Message message);
}
// Publishing an Event
Watchers.of(PushWatcher.class).notify(message);
// Subscribing to an Event
public class MyFragment extends Fragment implement PushWatcher {
public void notify(Message message) { }
}

类型安全

这样的消息定义方式是类型安全的,接口定义了什么方法,发布方就能用什么,监听方就必须监听什么方法,配合 IDE 强大的重构,很容易修改发布者和订阅者的协议。

一个类型安全的发布者能保证事件上下文跟订阅者最终拿到的上下文是同一份数据,而不需要经过 Bus 封装成 Message 来传递。

类型安全还能保证所有代码可以被混淆,而不像其他通过 Annotation 定义的(可能)需要特殊配制 proguard。

多事件接收

一个类能够接收多个事件,对于 UI 逻辑而言是非常方便的,不用为了接收一个事件就去写一个类。Java 的接口本来就可以多重实现,这里直接简单的实现多个接口即可接收多类事件。

// Subscribing to multiple Event
public class MyFragment extends Fragment
implement PushWatcher,NetworkWatcher {
public void notify(Message message) { }
public void onNetworkChanged() { }
}

实现上,我们通过查找该类的所有接口,看看这个接口是否继承自 Watcher 来判定是否需要接收消息。

do {
Class<?>[] interfaces = clazz.getInterfaces();
if (interfaces.length > 0) {
for (Class<?> interfaze : interfaces) {
if (!ret.contains(interfaze)) {
ret.addAll(getAllInterfaces(interfaze));
}
}
ret.addAll(Arrays.asList(interfaces));
}
Class<?> superClass = clazz.getSuperclass();
if (superClass == null) break;
clazz = superClass;
} while (clazz != Object.class);

自动解绑(内存安全)

很容易由于订阅者持有 Fragment 或者 View 的对象引用而导致内存泄漏,我们可以通过把订阅者封装为 WeakReference 再进行管理,这样就解开了和具体业务的强依赖避免内存泄漏,使用的时候只需要 bind 即可,无需显式解绑。

线程管理/事件配置及其他

整个 Bus 部分的逻辑基于 RxJava 的 Subject,不同类型的 Subject 可用于配置不同的订阅者,比如需要一个下载进度的通知,那么发布方只需要发布进度,不用考虑其他。

@Watchers.Config(
sample = 200, // 200ms 一次采样
backpressureDrop = true, // 如果订阅者处理不来,丢弃
subject = Watchers.Subjects.BEHAVIOR
)
public interface LoadingWatcher extends Watchers.Watcher {
void chapterProgress(LoadingProgress progress);
}

最后的 BehaviorSubject 用于实现订阅者一订阅,即可收到最近的一个状态。

如此配置之后,发布者和订阅者无须修改任何代码处理采样率(避免发布过快),不用修改任何代码即可实现任何一个订阅方接收到的即是最新的进度

写在最后

横向比较各种主流 EventBus

是 EventBus 的定义限制了 Event 和 Bus 的关系,其实无须上升到反模式的地步(参考 Reddit 讨论),也不应该某些实现而放弃(Avoid)某个模式。

这个 Watcher 的实现可在这里找到,只有300多行代码。

--

--

Frank Xu
微信读书

WeRead at WeChat. Growth Analyst, Data & Infra Engineer.