有些数据推送需要用观察者模式(也称作订阅者模式),看看docker是如何用golang实现这个的
//过一遍数据结构 type Events struct { mu sync.Mutex //锁 events []*jsonmessage.JSONMessage //数据 pub *pubsub.Publisher //发布者 } type Publisher struct { m sync.RWMutex buffer int //缓冲 timeout time.Duration //超时 subscribers map[subscriber]struct{} //订阅者map结构,方便以后遍历map }
作为一个订阅者,订阅一次
func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) { e.mu.Lock() current := make([]*jsonmessage.JSONMessage, len(e.events)) //e.events发布时候都得通知订阅者 copy(current, e.events) l := e.pub.Subscribe() //订阅,记录到Publisher的subscribers里面去 e.mu.Unlock() return current, l } func (p *Publisher) Subscribe() chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() p.subscribers[ch] = struct{}{} //记录 p.m.Unlock() return ch //返回一个管道 }
产生一次publisher,当然得广播给所有的订阅人
func (e *Events) Log(action, id, from string) { go func() { e.mu.Lock() jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()} if len(e.events) == cap(e.events) { // 抛弃最老的一条记录,因为大小超过分配的咯 copy(e.events, e.events[1:]) e.events[len(e.events)-1] = jm } else { e.events = append(e.events, jm) } e.mu.Unlock() e.pub.Publish(jm) //发布广播 }() }
最后再看看广播的代码
func (p *Publisher) Publish(v interface{}) { p.m.RLock() for sub := range p.subscribers { // send under a select as to not block if the receiver is unavailable //如果设置超时,默认设置是100*time.Millisecond,则发送数据,如果发送不了则阻塞直到收到超时信息 if p.timeout > 0 { select { case sub <- v: case <-time.After(p.timeout): } continue } //不设置超时,没有发送到管道数据成功,则默认继续执行 select { case sub <- v: default: } } p.m.RUnlock() }
看看我们哪里取了这些数据:
//............省略 for { select { case ev := <-l: //取到订阅者的发往管道的数据 jev, ok := ev.(*jsonmessage.JSONMessage) if !ok { continue } if err := sendEvent(jev); err != nil { return err } case <-timer.C://取到超时信息 return nil case <-closeNotify://取到关闭通知信息 logrus.Debug("Client disconnected, stop sending events") return nil } }
时间: 2024-10-11 00:00:40