跳到主要内容

观察者模式与发布订阅模式

问题

观察者模式和发布订阅模式是什么?它们有什么区别?如何实现?

答案

观察者模式(Observer)和发布订阅模式(Publish-Subscribe)都是用于实现对象间一对多依赖关系的设计模式,当一个对象状态改变时,所有依赖它的对象都会被通知。

观察者模式(Observer Pattern)

核心概念

观察者模式中,Subject(被观察者) 直接维护 Observer(观察者) 列表,状态变化时直接通知观察者。

// 观察者接口
interface Observer {
update(data: any): void;
}

// 被观察者(主题)
class Subject {
private observers: Observer[] = [];

// 添加观察者
addObserver(observer: Observer): void {
if (!this.observers.includes(observer)) {
this.observers.push(observer);
}
}

// 移除观察者
removeObserver(observer: Observer): void {
const index = this.observers.indexOf(observer);
if (index > -1) {
this.observers.splice(index, 1);
}
}

// 通知所有观察者
notify(data: any): void {
this.observers.forEach(observer => observer.update(data));
}
}

// 具体观察者
class ConcreteObserver implements Observer {
constructor(private name: string) {}

update(data: any): void {
console.log(`${this.name} 收到更新: ${data}`);
}
}

// 使用示例
const subject = new Subject();
const observer1 = new ConcreteObserver('观察者1');
const observer2 = new ConcreteObserver('观察者2');

subject.addObserver(observer1);
subject.addObserver(observer2);
subject.notify('Hello World');
// 输出:
// 观察者1 收到更新: Hello World
// 观察者2 收到更新: Hello World

发布订阅模式(Publish-Subscribe Pattern)

核心概念

发布订阅模式通过一个事件中心(Event Channel) 来解耦发布者和订阅者,发布者和订阅者互不知道对方的存在。

type EventHandler = (...args: any[]) => void;

class EventEmitter {
private events: Map<string, EventHandler[]> = new Map();

// 订阅事件
on(event: string, handler: EventHandler): void {
if (!this.events.has(event)) {
this.events.set(event, []);
}
this.events.get(event)!.push(handler);
}

// 取消订阅
off(event: string, handler: EventHandler): void {
const handlers = this.events.get(event);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}

// 发布事件
emit(event: string, ...args: any[]): void {
const handlers = this.events.get(event);
if (handlers) {
handlers.forEach(handler => handler(...args));
}
}

// 只订阅一次
once(event: string, handler: EventHandler): void {
const onceHandler: EventHandler = (...args) => {
handler(...args);
this.off(event, onceHandler);
};
this.on(event, onceHandler);
}

// 移除某个事件的所有订阅
removeAllListeners(event?: string): void {
if (event) {
this.events.delete(event);
} else {
this.events.clear();
}
}

// 获取某个事件的订阅者数量
listenerCount(event: string): number {
return this.events.get(event)?.length ?? 0;
}
}

// 使用示例
const emitter = new EventEmitter();

// 订阅
emitter.on('message', (data) => {
console.log('收到消息:', data);
});

emitter.once('login', (user) => {
console.log('用户登录:', user);
});

// 发布
emitter.emit('message', 'Hello!'); // 收到消息: Hello!
emitter.emit('login', 'Alice'); // 用户登录: Alice
emitter.emit('login', 'Bob'); // 无输出(once 只触发一次)

两者的区别

特性观察者模式发布订阅模式
耦合度Subject 和 Observer 直接关联通过事件中心解耦
通信方式同步通信可同步可异步
关系一对多多对多
灵活性较低较高
实现复杂度简单稍复杂
典型应用Vue 响应式、MobXEventEmitter、Redux、Vue 事件总线

完整的 EventEmitter 实现(支持命名空间)

type EventHandler = (...args: any[]) => void;

interface EventOptions {
once?: boolean;
priority?: number;
}

interface HandlerWrapper {
handler: EventHandler;
once: boolean;
priority: number;
}

class AdvancedEventEmitter {
private events: Map<string, HandlerWrapper[]> = new Map();
private maxListeners: number = 10;

setMaxListeners(n: number): this {
this.maxListeners = n;
return this;
}

on(event: string, handler: EventHandler, options: EventOptions = {}): this {
const { once = false, priority = 0 } = options;

if (!this.events.has(event)) {
this.events.set(event, []);
}

const handlers = this.events.get(event)!;

// 检查最大监听器数量
if (handlers.length >= this.maxListeners) {
console.warn(`Warning: Event "${event}" has more than ${this.maxListeners} listeners`);
}

handlers.push({ handler, once, priority });

// 按优先级排序(优先级高的先执行)
handlers.sort((a, b) => b.priority - a.priority);

return this;
}

off(event: string, handler?: EventHandler): this {
if (!handler) {
this.events.delete(event);
return this;
}

const handlers = this.events.get(event);
if (handlers) {
const index = handlers.findIndex(h => h.handler === handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
return this;
}

emit(event: string, ...args: any[]): boolean {
const handlers = this.events.get(event);
if (!handlers || handlers.length === 0) {
return false;
}

// 复制数组,防止在遍历时修改
const handlersToCall = [...handlers];

handlersToCall.forEach(({ handler, once }) => {
handler(...args);
if (once) {
this.off(event, handler);
}
});

return true;
}

once(event: string, handler: EventHandler): this {
return this.on(event, handler, { once: true });
}

// 异步发布
async emitAsync(event: string, ...args: any[]): Promise<void> {
const handlers = this.events.get(event);
if (!handlers) return;

for (const { handler, once } of [...handlers]) {
await handler(...args);
if (once) {
this.off(event, handler);
}
}
}

// 获取所有事件名
eventNames(): string[] {
return Array.from(this.events.keys());
}

// 获取某事件的所有监听器
listeners(event: string): EventHandler[] {
return this.events.get(event)?.map(h => h.handler) ?? [];
}
}

实际应用场景

模式应用场景
观察者模式Vue/React 响应式系统、DOM 事件监听、数据绑定
发布订阅跨组件通信、微前端通信、WebSocket 消息处理、日志系统
注意事项
  • 及时取消订阅,避免内存泄漏
  • 注意事件命名规范,避免冲突
  • 发布订阅模式过度使用会导致代码难以追踪
  • 考虑错误处理机制

相关链接