338 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			338 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
| package notifications
 | |
| 
 | |
| import (
 | |
| 	"container/list"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/Sirupsen/logrus"
 | |
| )
 | |
| 
 | |
| // NOTE(stevvooe): This file contains definitions for several utility sinks.
 | |
| // Typically, the broadcaster is the only sink that should be required
 | |
| // externally, but others are suitable for export if the need arises. Albeit,
 | |
| // the tight integration with endpoint metrics should be removed.
 | |
| 
 | |
| // Broadcaster sends events to multiple, reliable Sinks. The goal of this
 | |
| // component is to dispatch events to configured endpoints. Reliability can be
 | |
| // provided by wrapping incoming sinks.
 | |
| type Broadcaster struct {
 | |
| 	sinks  []Sink
 | |
| 	events chan []Event
 | |
| 	closed chan chan struct{}
 | |
| }
 | |
| 
 | |
| // NewBroadcaster ...
 | |
| // Add appends one or more sinks to the list of sinks. The broadcaster
 | |
| // behavior will be affected by the properties of the sink. Generally, the
 | |
| // sink should accept all messages and deal with reliability on its own. Use
 | |
| // of EventQueue and RetryingSink should be used here.
 | |
| func NewBroadcaster(sinks ...Sink) *Broadcaster {
 | |
| 	b := Broadcaster{
 | |
| 		sinks:  sinks,
 | |
| 		events: make(chan []Event),
 | |
| 		closed: make(chan chan struct{}),
 | |
| 	}
 | |
| 
 | |
| 	// Start the broadcaster
 | |
| 	go b.run()
 | |
| 
 | |
| 	return &b
 | |
| }
 | |
| 
 | |
| // Write accepts a block of events to be dispatched to all sinks. This method
 | |
| // will never fail and should never block (hopefully!). The caller cedes the
 | |
| // slice memory to the broadcaster and should not modify it after calling
 | |
| // write.
 | |
| func (b *Broadcaster) Write(events ...Event) error {
 | |
| 	select {
 | |
| 	case b.events <- events:
 | |
| 	case <-b.closed:
 | |
| 		return ErrSinkClosed
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close the broadcaster, ensuring that all messages are flushed to the
 | |
| // underlying sink before returning.
 | |
| func (b *Broadcaster) Close() error {
 | |
| 	logrus.Infof("broadcaster: closing")
 | |
| 	select {
 | |
| 	case <-b.closed:
 | |
| 		// already closed
 | |
| 		return fmt.Errorf("broadcaster: already closed")
 | |
| 	default:
 | |
| 		// do a little chan handoff dance to synchronize closing
 | |
| 		closed := make(chan struct{})
 | |
| 		b.closed <- closed
 | |
| 		close(b.closed)
 | |
| 		<-closed
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // run is the main broadcast loop, started when the broadcaster is created.
 | |
| // Under normal conditions, it waits for events on the event channel. After
 | |
| // Close is called, this goroutine will exit.
 | |
| func (b *Broadcaster) run() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case block := <-b.events:
 | |
| 			for _, sink := range b.sinks {
 | |
| 				if err := sink.Write(block...); err != nil {
 | |
| 					logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
 | |
| 				}
 | |
| 			}
 | |
| 		case closing := <-b.closed:
 | |
| 
 | |
| 			// close all the underlying sinks
 | |
| 			for _, sink := range b.sinks {
 | |
| 				if err := sink.Close(); err != nil {
 | |
| 					logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
 | |
| 				}
 | |
| 			}
 | |
| 			closing <- struct{}{}
 | |
| 
 | |
| 			logrus.Debugf("broadcaster: closed")
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // eventQueue accepts all messages into a queue for asynchronous consumption
 | |
| // by a sink. It is unbounded and thread safe but the sink must be reliable or
 | |
| // events will be dropped.
 | |
| type eventQueue struct {
 | |
| 	sink      Sink
 | |
| 	events    *list.List
 | |
| 	listeners []eventQueueListener
 | |
| 	cond      *sync.Cond
 | |
| 	mu        sync.Mutex
 | |
| 	closed    bool
 | |
| }
 | |
| 
 | |
| // eventQueueListener is called when various events happen on the queue.
 | |
| type eventQueueListener interface {
 | |
| 	ingress(events ...Event)
 | |
| 	egress(events ...Event)
 | |
| }
 | |
| 
 | |
| // newEventQueue returns a queue to the provided sink. If the updater is non-
 | |
| // nil, it will be called to update pending metrics on ingress and egress.
 | |
| func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
 | |
| 	eq := eventQueue{
 | |
| 		sink:      sink,
 | |
| 		events:    list.New(),
 | |
| 		listeners: listeners,
 | |
| 	}
 | |
| 
 | |
| 	eq.cond = sync.NewCond(&eq.mu)
 | |
| 	go eq.run()
 | |
| 	return &eq
 | |
| }
 | |
| 
 | |
| // Write accepts the events into the queue, only failing if the queue has
 | |
| // beend closed.
 | |
| func (eq *eventQueue) Write(events ...Event) error {
 | |
| 	eq.mu.Lock()
 | |
| 	defer eq.mu.Unlock()
 | |
| 
 | |
| 	if eq.closed {
 | |
| 		return ErrSinkClosed
 | |
| 	}
 | |
| 
 | |
| 	for _, listener := range eq.listeners {
 | |
| 		listener.ingress(events...)
 | |
| 	}
 | |
| 	eq.events.PushBack(events)
 | |
| 	eq.cond.Signal() // signal waiters
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close shutsdown the event queue, flushing
 | |
| func (eq *eventQueue) Close() error {
 | |
| 	eq.mu.Lock()
 | |
| 	defer eq.mu.Unlock()
 | |
| 
 | |
| 	if eq.closed {
 | |
| 		return fmt.Errorf("eventqueue: already closed")
 | |
| 	}
 | |
| 
 | |
| 	// set closed flag
 | |
| 	eq.closed = true
 | |
| 	eq.cond.Signal() // signal flushes queue
 | |
| 	eq.cond.Wait()   // wait for signal from last flush
 | |
| 
 | |
| 	return eq.sink.Close()
 | |
| }
 | |
| 
 | |
| // run is the main goroutine to flush events to the target sink.
 | |
| func (eq *eventQueue) run() {
 | |
| 	for {
 | |
| 		block := eq.next()
 | |
| 
 | |
| 		if block == nil {
 | |
| 			return // nil block means event queue is closed.
 | |
| 		}
 | |
| 
 | |
| 		if err := eq.sink.Write(block...); err != nil {
 | |
| 			logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
 | |
| 		}
 | |
| 
 | |
| 		for _, listener := range eq.listeners {
 | |
| 			listener.egress(block...)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // next encompasses the critical section of the run loop. When the queue is
 | |
| // empty, it will block on the condition. If new data arrives, it will wake
 | |
| // and return a block. When closed, a nil slice will be returned.
 | |
| func (eq *eventQueue) next() []Event {
 | |
| 	eq.mu.Lock()
 | |
| 	defer eq.mu.Unlock()
 | |
| 
 | |
| 	for eq.events.Len() < 1 {
 | |
| 		if eq.closed {
 | |
| 			eq.cond.Broadcast()
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		eq.cond.Wait()
 | |
| 	}
 | |
| 
 | |
| 	front := eq.events.Front()
 | |
| 	block := front.Value.([]Event)
 | |
| 	eq.events.Remove(front)
 | |
| 
 | |
| 	return block
 | |
| }
 | |
| 
 | |
| // retryingSink retries the write until success or an ErrSinkClosed is
 | |
| // returned. Underlying sink must have p > 0 of succeeding or the sink will
 | |
| // block. Internally, it is a circuit breaker retries to manage reset.
 | |
| // Concurrent calls to a retrying sink are serialized through the sink,
 | |
| // meaning that if one is in-flight, another will not proceed.
 | |
| type retryingSink struct {
 | |
| 	mu     sync.Mutex
 | |
| 	sink   Sink
 | |
| 	closed bool
 | |
| 
 | |
| 	// circuit breaker heuristics
 | |
| 	failures struct {
 | |
| 		threshold int
 | |
| 		recent    int
 | |
| 		last      time.Time
 | |
| 		backoff   time.Duration // time after which we retry after failure.
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type retryingSinkListener interface {
 | |
| 	active(events ...Event)
 | |
| 	retry(events ...Event)
 | |
| }
 | |
| 
 | |
| // TODO(stevvooe): We are using circuit break here, which actually doesn't
 | |
| // make a whole lot of sense for this use case, since we always retry. Move
 | |
| // this to use bounded exponential backoff.
 | |
| 
 | |
| // newRetryingSink returns a sink that will retry writes to a sink, backing
 | |
| // off on failure. Parameters threshold and backoff adjust the behavior of the
 | |
| // circuit breaker.
 | |
| func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
 | |
| 	rs := &retryingSink{
 | |
| 		sink: sink,
 | |
| 	}
 | |
| 	rs.failures.threshold = threshold
 | |
| 	rs.failures.backoff = backoff
 | |
| 
 | |
| 	return rs
 | |
| }
 | |
| 
 | |
| // Write attempts to flush the events to the downstream sink until it succeeds
 | |
| // or the sink is closed.
 | |
| func (rs *retryingSink) Write(events ...Event) error {
 | |
| 	rs.mu.Lock()
 | |
| 	defer rs.mu.Unlock()
 | |
| 
 | |
| retry:
 | |
| 
 | |
| 	if rs.closed {
 | |
| 		return ErrSinkClosed
 | |
| 	}
 | |
| 
 | |
| 	if !rs.proceed() {
 | |
| 		logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
 | |
| 		rs.wait(rs.failures.backoff)
 | |
| 		goto retry
 | |
| 	}
 | |
| 
 | |
| 	if err := rs.write(events...); err != nil {
 | |
| 		if err == ErrSinkClosed {
 | |
| 			// terminal!
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
 | |
| 		goto retry
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close closes the sink and the underlying sink.
 | |
| func (rs *retryingSink) Close() error {
 | |
| 	rs.mu.Lock()
 | |
| 	defer rs.mu.Unlock()
 | |
| 
 | |
| 	if rs.closed {
 | |
| 		return fmt.Errorf("retryingsink: already closed")
 | |
| 	}
 | |
| 
 | |
| 	rs.closed = true
 | |
| 	return rs.sink.Close()
 | |
| }
 | |
| 
 | |
| // write provides a helper that dispatches failure and success properly. Used
 | |
| // by write as the single-flight write call.
 | |
| func (rs *retryingSink) write(events ...Event) error {
 | |
| 	if err := rs.sink.Write(events...); err != nil {
 | |
| 		rs.failure()
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	rs.reset()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // wait backoff time against the sink, unlocking so others can proceed. Should
 | |
| // only be called by methods that currently have the mutex.
 | |
| func (rs *retryingSink) wait(backoff time.Duration) {
 | |
| 	rs.mu.Unlock()
 | |
| 	defer rs.mu.Lock()
 | |
| 
 | |
| 	// backoff here
 | |
| 	time.Sleep(backoff)
 | |
| }
 | |
| 
 | |
| // reset marks a successful call.
 | |
| func (rs *retryingSink) reset() {
 | |
| 	rs.failures.recent = 0
 | |
| 	rs.failures.last = time.Time{}
 | |
| }
 | |
| 
 | |
| // failure records a failure.
 | |
| func (rs *retryingSink) failure() {
 | |
| 	rs.failures.recent++
 | |
| 	rs.failures.last = time.Now().UTC()
 | |
| }
 | |
| 
 | |
| // proceed returns true if the call should proceed based on circuit breaker
 | |
| // heuristics.
 | |
| func (rs *retryingSink) proceed() bool {
 | |
| 	return rs.failures.recent < rs.failures.threshold ||
 | |
| 		time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
 | |
| }
 |