165 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			165 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
package notifications
 | 
						|
 | 
						|
import (
 | 
						|
	"container/list"
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	events "github.com/docker/go-events"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
)
 | 
						|
 | 
						|
// eventQueue accepts all messages into a queue for asynchronous consumption
 | 
						|
// by a sink. It is unbounded and thread safe but the sink must be reliable or
 | 
						|
// events will be dropped.
 | 
						|
type eventQueue struct {
 | 
						|
	sink      events.Sink
 | 
						|
	events    *list.List
 | 
						|
	listeners []eventQueueListener
 | 
						|
	cond      *sync.Cond
 | 
						|
	mu        sync.Mutex
 | 
						|
	closed    bool
 | 
						|
}
 | 
						|
 | 
						|
// eventQueueListener is called when various events happen on the queue.
 | 
						|
type eventQueueListener interface {
 | 
						|
	ingress(event events.Event)
 | 
						|
	egress(event events.Event)
 | 
						|
}
 | 
						|
 | 
						|
// newEventQueue returns a queue to the provided sink. If the updater is non-
 | 
						|
// nil, it will be called to update pending metrics on ingress and egress.
 | 
						|
func newEventQueue(sink events.Sink, listeners ...eventQueueListener) *eventQueue {
 | 
						|
	eq := eventQueue{
 | 
						|
		sink:      sink,
 | 
						|
		events:    list.New(),
 | 
						|
		listeners: listeners,
 | 
						|
	}
 | 
						|
 | 
						|
	eq.cond = sync.NewCond(&eq.mu)
 | 
						|
	go eq.run()
 | 
						|
	return &eq
 | 
						|
}
 | 
						|
 | 
						|
// Write accepts the events into the queue, only failing if the queue has
 | 
						|
// beend closed.
 | 
						|
func (eq *eventQueue) Write(event events.Event) error {
 | 
						|
	eq.mu.Lock()
 | 
						|
	defer eq.mu.Unlock()
 | 
						|
 | 
						|
	if eq.closed {
 | 
						|
		return ErrSinkClosed
 | 
						|
	}
 | 
						|
 | 
						|
	for _, listener := range eq.listeners {
 | 
						|
		listener.ingress(event)
 | 
						|
	}
 | 
						|
	eq.events.PushBack(event)
 | 
						|
	eq.cond.Signal() // signal waiters
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Close shuts down the event queue, flushing
 | 
						|
func (eq *eventQueue) Close() error {
 | 
						|
	eq.mu.Lock()
 | 
						|
	defer eq.mu.Unlock()
 | 
						|
 | 
						|
	if eq.closed {
 | 
						|
		return fmt.Errorf("eventqueue: already closed")
 | 
						|
	}
 | 
						|
 | 
						|
	// set closed flag
 | 
						|
	eq.closed = true
 | 
						|
	eq.cond.Signal() // signal flushes queue
 | 
						|
	eq.cond.Wait()   // wait for signal from last flush
 | 
						|
 | 
						|
	return eq.sink.Close()
 | 
						|
}
 | 
						|
 | 
						|
// run is the main goroutine to flush events to the target sink.
 | 
						|
func (eq *eventQueue) run() {
 | 
						|
	for {
 | 
						|
		event := eq.next()
 | 
						|
 | 
						|
		if event == nil {
 | 
						|
			return // nil block means event queue is closed.
 | 
						|
		}
 | 
						|
 | 
						|
		if err := eq.sink.Write(event); err != nil {
 | 
						|
			logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
 | 
						|
		}
 | 
						|
 | 
						|
		for _, listener := range eq.listeners {
 | 
						|
			listener.egress(event)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// next encompasses the critical section of the run loop. When the queue is
 | 
						|
// empty, it will block on the condition. If new data arrives, it will wake
 | 
						|
// and return a block. When closed, a nil slice will be returned.
 | 
						|
func (eq *eventQueue) next() events.Event {
 | 
						|
	eq.mu.Lock()
 | 
						|
	defer eq.mu.Unlock()
 | 
						|
 | 
						|
	for eq.events.Len() < 1 {
 | 
						|
		if eq.closed {
 | 
						|
			eq.cond.Broadcast()
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		eq.cond.Wait()
 | 
						|
	}
 | 
						|
 | 
						|
	front := eq.events.Front()
 | 
						|
	block := front.Value.(events.Event)
 | 
						|
	eq.events.Remove(front)
 | 
						|
 | 
						|
	return block
 | 
						|
}
 | 
						|
 | 
						|
// ignoredSink discards events with ignored target media types and actions.
 | 
						|
// passes the rest along.
 | 
						|
type ignoredSink struct {
 | 
						|
	events.Sink
 | 
						|
	ignoreMediaTypes map[string]bool
 | 
						|
	ignoreActions    map[string]bool
 | 
						|
}
 | 
						|
 | 
						|
func newIgnoredSink(sink events.Sink, ignored []string, ignoreActions []string) events.Sink {
 | 
						|
	if len(ignored) == 0 {
 | 
						|
		return sink
 | 
						|
	}
 | 
						|
 | 
						|
	ignoredMap := make(map[string]bool)
 | 
						|
	for _, mediaType := range ignored {
 | 
						|
		ignoredMap[mediaType] = true
 | 
						|
	}
 | 
						|
 | 
						|
	ignoredActionsMap := make(map[string]bool)
 | 
						|
	for _, action := range ignoreActions {
 | 
						|
		ignoredActionsMap[action] = true
 | 
						|
	}
 | 
						|
 | 
						|
	return &ignoredSink{
 | 
						|
		Sink:             sink,
 | 
						|
		ignoreMediaTypes: ignoredMap,
 | 
						|
		ignoreActions:    ignoredActionsMap,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Write discards events with ignored target media types and passes the rest
 | 
						|
// along.
 | 
						|
func (imts *ignoredSink) Write(event events.Event) error {
 | 
						|
	if imts.ignoreMediaTypes[event.(Event).Target.MediaType] || imts.ignoreActions[event.(Event).Action] {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	return imts.Sink.Write(event)
 | 
						|
}
 | 
						|
 | 
						|
func (imts *ignoredSink) Close() error {
 | 
						|
	return nil
 | 
						|
}
 |