设计模式-发布订阅模式与观察者模式

发布订阅者模式

发布订阅模式

  1. 消息中心:负责存储消息与订阅者的对应关系,有消息触发时,负责通知订阅者
  2. 订阅者:去消息中心订阅自己感兴趣的消息
  3. 发布者:满足条件时,通过消息中心发布消息

实现

var broastEvent = {
    // 消息监听队列
    clientList: {},
    // 订阅(监听)消息
    listen: function(observer, action) {
        if (clientList[observer]) { // 如果已经存在订阅消息了
            this.clientList[observer].push(action) // 新增监听
        } else {
            this.clientList[observer] = [].concat(action)
        }
    },
    // 发布(触发)消息
    trigger: function(observer, ...args) {
        const actionList = this.clientList[observer] || []
        if (actionList.length > 0) {
            actionList.forEach(fn => {
                fn(...args)
            })
        }
    },
    // 移除消息监听
    remove: function(observer) {
        this.clientList && delete this.clientList[observer]
    }
}


// 测试
broastEvent.listen('some', function (...args) {
    console.log('some: ', args, this)
}.bind(this))

broastEvent.listen('some2', (...args) => {
    console.log('some2: ', args, this)
})
setTimeout(() => {
    broastEvent.trigger('some', 123, 234)
    broastEvent.trigger('some2', 123, 234)
    broastEvent.remove('some2')
}, 2000)
setTimeout(() => {
    broastEvent.trigger('some2', 123, 234)
    broastEvent.trigger('some', 123, 234)
}, 5000)
// some:  [ 123, 234 ] {}
// some2:  [ 123, 234 ] {}
// some:  [ 123, 234 ] {}

优化

增加仅订阅一次的事件消息,响应后删除。

增加唯一性事件监听,如果多个消息监听,仅最后一个事件消息有效。

使用map改写消息列表

var broastEvent = {
    clientList: new Map(),
    _addListen: function(observe, actionObj, isUniq = false) {
        const clientList = this.clientList
        if (!isUniq && clientList.has(observe)) {
            clientList.get(observe).push(actionObj)
        } else {
            clientList.set(observe, [].concat(actionObj))
        }
    },
    listen: function(observe, action, isUniq = false) { // 默认不唯一
        this._addListen(observe, {action}, isUniq)
    },
    once: function(observe, action, isUniq = false) {
        this._addListen(observe, {
            action,
            once: true
        }, isUniq)
    },
    trigger: function(observe, ...args) {
        const actions = this.clientList.get(observe) || []
        if (actions.length > 0) {
            actions = actions.filter(item => {
                try {
                    item.action(...args)
                } catch(e) {
                    console.log(e)
                }
                return !item.once
            })
            actions.length === 0 ? this.remove(observe) : this.clientList.set(observe, actions)
        }
    },
    remove: function(observe, action) {
        if (this.clientList.has(observe) && action) {
            let actions = this.clientList.get(observe)
            const index = actions.findIndex(item => item.action === action)
            actions.splice(index, 1) // 可以使用filter改写
            actions.length === 0 && this.removeAll(observe)
        }
        if (!action) {
            this.removeAll(observe)
        }
    },
    removeAll: function(observe) {
        this.clientList.has(observe) && this.clientList.delete(observe)
    }
}

// 测试
broastEvent.once('some', function (...args) {
    console.log('some: ', args)
}.bind(this))

broastEvent.listen('some2', (...args) => {
    console.log('some2: ', args)
})
broastEvent.listen('some3', (...args) => {
    console.log('some3: ', args)
})

broastEvent.listen('some3', (...args) => {
    console.log('some3 isUniq: ', args)
}, true)
setTimeout(() => {
    broastEvent.trigger('some', 123, 234)
    broastEvent.trigger('some2', 123, 234)
    // broastEvent.remove('some2')
    broastEvent.trigger('some3', 123, 234)
}, 2000)
setTimeout(() => {
    broastEvent.trigger('some2', 123, 234)
    broastEvent.trigger('some', 123, 234)
    broastEvent.trigger('some3', 456)
}, 5000)
var Event = (function() {
    var _default = 'default'

    var Event = (function() {
        var _listen,
            _trigger,
            _remove,
            _slice = Array.prototype.slice,
            _shift = Array.prototype.shift,
            _unshift = Array.prototype.unshift,
            namespaceCache = {},
            _create,
            find
        var each = function(ary, fn) {
            var ret
            for (var i = 0, l = ary.length; i < l; i++) {
                var n = ary[i]
                ret = fn.call(n, i, n)
            }
            return ret
        }
        _listen = function(key, fn, cache) {
            if (!cache[key]) {
                cache[key] = []
            }
            cache[key].push(fn)
        }
        _remove = function(key, cache, fn) {
            if (cache[key]) {
                if (fn) {
                    for (var i = cache[key].length; i >= 0; i--) {
                        if (cache[key][i] === fn) {
                            cache[key].splice(i, 1)
                        }
                    }
                } else {
                    cache[key] = []
                }
            }
        }
        _trigger = function() {
            var cache = _shift.call(arguments)
            var key = _shift.call(arguments)
            var args = arguments
            var _self = this
            var stack = cache[key]
            if (!stack || !stack.length) {
                return
            }
            return each(stack, function() {
                return this.apply(_self, args)
            })
        }
        _create = function(namespace) {
            var namespace = namespace || _default
            var cache = {}
            var offlineStack = [] // 离线事件
            var ret = {
                listen: function(key, fn, last) {
                    _listen(key, fn, cache)
                    if (offlineStack === null) {
                        return
                    }
                    if (last === 'last') {
                        offlineStack.length && offlineStack.pop()
                    } else {
                        each(offlineStack, function() {
                            this()
                        })
                    }
                    offlineStack = null
                },
                one: function(key, fn, last) {
                    _remove(key, cache)
                    this.listen(key, fn, last)
                },
                remove: function(key, fn) {
                    _remove(key, cache, fn)
                },
                trigger: function() {
                    var fn,
                        args,
                        _self = this
                    _unshift.call(arguments, cache)
                    args = arguments
                    fn = function() {
                        return _trigger.apply(_self, args)
                    }
                    if (offlineStack) {
                        return offlineStack.push(fn)
                    }
                    return fn()
                }
            }
            return namespace
                ? namespaceCache[namespace]
                    ? namespaceCache[namespace]
                    : (namespaceCache[namespace] = ret)
                : ret
        }

        return {
            create: _create,
            one: function(key, fn, last) {
                var event = this.create()
                event.one(key, fn, last)
            },
            remove: function(key, fn) {
                var event = this.create()
                event.remove(key, fn)
            },
            listen: function(key, fn, last) {
                var event = this.create()
                event.listen(key, fn, last)
            },
            trigger: function() {
                var event = this.create()
                event.trigger.apply(this, arguments)
            }
        }
    })()
    return Event
})()

应用

[一对多]、[多对多] 的场景

  • vue 中的 event 机制

观察者模式

有很多文章认为观察者模式 = 发布订阅模式。但是有些人又不这么认为。

观察者模式

观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个目标对象(被观察在主体),当这个目标对象的状态发生变化时,会通知所有观察者对象,使它们能够自动更新。

  1. 被观察主体
  2. 观察者

被观察主体<=>观察者,观察者观察被观察主体(监听被观察主体)-> 被观察主体发生变化 -> 被观察主体主动通知观察者(使用回调或者统一的接口)。

class Subject {
  constructor() {
    // 一个数组存放所有的订阅者
    // 每个消息对应一个数组,数组结构如下
    // [
    //   {
    //     observer: obj,
    //     action: () => {}
    //   }
    // ]
    this.observers = [];
  }

  addObserver(observer, action) {
    // 将观察者和回调放入数组
    this.observers.push({observer, action});
  }

  notify(...args) {
    // 执行每个观察者的回调
    this.observers.forEach(item => {
      const {observer, action} = item;
      action.call(observer, ...args);
    })
  }
}

const subject = new Subject();

// 添加一个观察者
subject.addObserver({name: 'John'}, function(msg){
  console.log(this.name, 'got message: ', msg);
})

// 再添加一个观察者
subject.addObserver({name: 'Joe'}, function(msg) {
  console.log(this.name, 'got message: ', msg);
})

// 通知所有观察者
subject.notify('tomorrow is Sunday');

响应式

/**
 * 观察监听一个对象成员的变化
 * @param {Object} obj 观察的对象
 * @param {String} targetVariable 观察的对象成员
 * @param {Function} callback 目标变化触发的回调
 */
function observer(obj, targetVariable, callback) {
  if (!obj.data) {
    obj.data = {}
  }
  Object.defineProperty(obj, targetVariable, {
    get() {
      return this.data[targetVariable]
    },
    set(val) {
      this.data[targetVariable] = val
      // 目标主动通知观察者
      callback && callback(val)
    },
  })
  if (obj.data[targetVariable]) {
    callback && callback(obj.data[targetVariable])
  }
}

应用

[一对多] 的场景

发布订阅模式与观察者模式区别

观察者vs发布订阅

  • 在观察者模式中,观察者知道被观察的主体,被观察的主体也维护观察者的记录。
    在发布订阅模式中,发布者不需要知道有谁订阅了,发布者和订阅者只是在消息队列或代理的帮助下进行通信,更耗性能因为要维护消息队列。
  • 在发布订阅模式中,与观察者模式相反,是松耦合的。
  • 观察者模式大多以同步的方式实现,即当某个事件发生时, Subject 调用其所有观察者的适当方法。而发布订阅大多是异步的方式进行(消息队列)。
  • 观察者模式需要在单个应用程序地址空间中实现。
    发布订阅模式更像是一种跨应用程序模式。

参考文章

http://www.dennisgo.cn/Articles/DesignPatterns/PubSub.html

https://hackernoon.com/observer-vs-pub-sub-pattern-50d3b27f838c

翻译版 - https://zhuanlan.zhihu.com/p/51357583

https://segmentfault.com/a/1190000020169229

https://www.cnblogs.com/onepixel/p/10806891.html