Move notifications package to distribution
Since the notifications package is now decoupled from storage, we are moving it to the root package. Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									09bf752234
								
							
						
					
					
						commit
						ed8827c3c2
					
				| 
						 | 
				
			
			@ -16,7 +16,6 @@ import (
 | 
			
		|||
	"github.com/docker/distribution/registry/storage"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/factory"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/notifications"
 | 
			
		||||
	"github.com/gorilla/mux"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,155 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"code.google.com/p/go-uuid/uuid"
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/manifest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type bridge struct {
 | 
			
		||||
	ub      URLBuilder
 | 
			
		||||
	actor   ActorRecord
 | 
			
		||||
	source  SourceRecord
 | 
			
		||||
	request RequestRecord
 | 
			
		||||
	sink    Sink
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ Listener = &bridge{}
 | 
			
		||||
 | 
			
		||||
// URLBuilder defines a subset of url builder to be used by the event listener.
 | 
			
		||||
type URLBuilder interface {
 | 
			
		||||
	BuildManifestURL(name, tag string) (string, error)
 | 
			
		||||
	BuildBlobURL(name string, dgst digest.Digest) (string, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewBridge returns a notification listener that writes records to sink,
 | 
			
		||||
// using the actor and source. Any urls populated in the events created by
 | 
			
		||||
// this bridge will be created using the URLBuilder.
 | 
			
		||||
// TODO(stevvooe): Update this to simply take a context.Context object.
 | 
			
		||||
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink) Listener {
 | 
			
		||||
	return &bridge{
 | 
			
		||||
		ub:      ub,
 | 
			
		||||
		actor:   actor,
 | 
			
		||||
		source:  source,
 | 
			
		||||
		request: request,
 | 
			
		||||
		sink:    sink,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRequestRecord builds a RequestRecord for use in NewBridge from an
 | 
			
		||||
// http.Request, associating it with a request id.
 | 
			
		||||
func NewRequestRecord(id string, r *http.Request) RequestRecord {
 | 
			
		||||
	return RequestRecord{
 | 
			
		||||
		ID:        id,
 | 
			
		||||
		Addr:      r.RemoteAddr,
 | 
			
		||||
		Host:      r.Host,
 | 
			
		||||
		Method:    r.Method,
 | 
			
		||||
		UserAgent: r.UserAgent(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error {
 | 
			
		||||
	return b.createManifestEventAndWrite(EventActionPush, repo, sm)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error {
 | 
			
		||||
	return b.createManifestEventAndWrite(EventActionPull, repo, sm)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error {
 | 
			
		||||
	return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) LayerPushed(repo distribution.Repository, layer distribution.Layer) error {
 | 
			
		||||
	return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) LayerPulled(repo distribution.Repository, layer distribution.Layer) error {
 | 
			
		||||
	return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) LayerDeleted(repo distribution.Repository, layer distribution.Layer) error {
 | 
			
		||||
	return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Repository, sm *manifest.SignedManifest) error {
 | 
			
		||||
	event, err := b.createManifestEvent(action, repo, sm)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return b.sink.Write(*event)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) createManifestEvent(action string, repo distribution.Repository, sm *manifest.SignedManifest) (*Event, error) {
 | 
			
		||||
	event := b.createEvent(action)
 | 
			
		||||
	event.Target.Type = EventTargetTypeManifest
 | 
			
		||||
	event.Target.Name = repo.Name()
 | 
			
		||||
	event.Target.Tag = sm.Tag
 | 
			
		||||
 | 
			
		||||
	p, err := sm.Payload()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	event.Target.Digest, err = digest.FromBytes(p)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): Currently, the is the "tag" url: once the digest url is
 | 
			
		||||
	// implemented, this should be replaced.
 | 
			
		||||
	event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return event, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) createLayerEventAndWrite(action string, repo distribution.Repository, dgst digest.Digest) error {
 | 
			
		||||
	event, err := b.createLayerEvent(action, repo, dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return b.sink.Write(*event)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *bridge) createLayerEvent(action string, repo distribution.Repository, dgst digest.Digest) (*Event, error) {
 | 
			
		||||
	event := b.createEvent(action)
 | 
			
		||||
	event.Target.Type = EventTargetTypeBlob
 | 
			
		||||
	event.Target.Name = repo.Name()
 | 
			
		||||
	event.Target.Digest = dgst
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return event, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// createEvent creates an event with actor and source populated.
 | 
			
		||||
func (b *bridge) createEvent(action string) *Event {
 | 
			
		||||
	event := createEvent(action)
 | 
			
		||||
	event.Source = b.source
 | 
			
		||||
	event.Actor = b.actor
 | 
			
		||||
	event.Request = b.request
 | 
			
		||||
 | 
			
		||||
	return event
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// createEvent returns a new event, timestamped, with the specified action.
 | 
			
		||||
func createEvent(action string) *Event {
 | 
			
		||||
	return &Event{
 | 
			
		||||
		ID:        uuid.New(),
 | 
			
		||||
		Timestamp: time.Now(),
 | 
			
		||||
		Action:    action,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,86 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// EndpointConfig covers the optional configuration parameters for an active
 | 
			
		||||
// endpoint.
 | 
			
		||||
type EndpointConfig struct {
 | 
			
		||||
	Headers   http.Header
 | 
			
		||||
	Timeout   time.Duration
 | 
			
		||||
	Threshold int
 | 
			
		||||
	Backoff   time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// defaults set any zero-valued fields to a reasonable default.
 | 
			
		||||
func (ec *EndpointConfig) defaults() {
 | 
			
		||||
	if ec.Timeout <= 0 {
 | 
			
		||||
		ec.Timeout = time.Second
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if ec.Threshold <= 0 {
 | 
			
		||||
		ec.Threshold = 10
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if ec.Backoff <= 0 {
 | 
			
		||||
		ec.Backoff = time.Second
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Endpoint is a reliable, queued, thread-safe sink that notify external http
 | 
			
		||||
// services when events are written. Writes are non-blocking and always
 | 
			
		||||
// succeed for callers but events may be queued internally.
 | 
			
		||||
type Endpoint struct {
 | 
			
		||||
	Sink
 | 
			
		||||
	url  string
 | 
			
		||||
	name string
 | 
			
		||||
 | 
			
		||||
	EndpointConfig
 | 
			
		||||
 | 
			
		||||
	metrics *safeMetrics
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewEndpoint returns a running endpoint, ready to receive events.
 | 
			
		||||
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
 | 
			
		||||
	var endpoint Endpoint
 | 
			
		||||
	endpoint.name = name
 | 
			
		||||
	endpoint.url = url
 | 
			
		||||
	endpoint.EndpointConfig = config
 | 
			
		||||
	endpoint.defaults()
 | 
			
		||||
	endpoint.metrics = newSafeMetrics()
 | 
			
		||||
 | 
			
		||||
	// Configures the inmemory queue, retry, http pipeline.
 | 
			
		||||
	endpoint.Sink = newHTTPSink(
 | 
			
		||||
		endpoint.url, endpoint.Timeout, endpoint.Headers,
 | 
			
		||||
		endpoint.metrics.httpStatusListener())
 | 
			
		||||
	endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
 | 
			
		||||
	endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
 | 
			
		||||
 | 
			
		||||
	register(&endpoint)
 | 
			
		||||
	return &endpoint
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Name returns the name of the endpoint, generally used for debugging.
 | 
			
		||||
func (e *Endpoint) Name() string {
 | 
			
		||||
	return e.name
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// URL returns the url of the endpoint.
 | 
			
		||||
func (e *Endpoint) URL() string {
 | 
			
		||||
	return e.url
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadMetrics populates em with metrics from the endpoint.
 | 
			
		||||
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
 | 
			
		||||
	e.metrics.Lock()
 | 
			
		||||
	defer e.metrics.Unlock()
 | 
			
		||||
 | 
			
		||||
	*em = e.metrics.EndpointMetrics
 | 
			
		||||
	// Map still need to copied in a threadsafe manner.
 | 
			
		||||
	em.Statuses = make(map[string]int)
 | 
			
		||||
	for k, v := range e.metrics.Statuses {
 | 
			
		||||
		em.Statuses[k] = v
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,154 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// EventAction constants used in action field of Event.
 | 
			
		||||
const (
 | 
			
		||||
	EventActionPull   = "pull"
 | 
			
		||||
	EventActionPush   = "push"
 | 
			
		||||
	EventActionDelete = "delete"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// EventTargetType constants used in Target section of Event.
 | 
			
		||||
const (
 | 
			
		||||
	EventTargetTypeManifest = "manifest"
 | 
			
		||||
	EventTargetTypeBlob     = "blob"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// EventsMediaType is the mediatype for the json event envelope. If the Event,
 | 
			
		||||
// ActorRecord, SourceRecord or Envelope structs change, the version number
 | 
			
		||||
// should be incremented.
 | 
			
		||||
const EventsMediaType = "application/vnd.docker.distribution.events.v1+json"
 | 
			
		||||
 | 
			
		||||
// Envelope defines the fields of a json event envelope message that can hold
 | 
			
		||||
// one or more events.
 | 
			
		||||
type Envelope struct {
 | 
			
		||||
	// Events make up the contents of the envelope. Events present in a single
 | 
			
		||||
	// envelope are not necessarily related.
 | 
			
		||||
	Events []Event `json:"events,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO(stevvooe): The event type should be separate from the json format. It
 | 
			
		||||
// should be defined as an interface. Leaving as is for now since we don't
 | 
			
		||||
// need that at this time. If we make this change, the struct below would be
 | 
			
		||||
// called "EventRecord".
 | 
			
		||||
 | 
			
		||||
// Event provides the fields required to describe a registry event.
 | 
			
		||||
type Event struct {
 | 
			
		||||
	// ID provides a unique identifier for the event.
 | 
			
		||||
	ID string `json:"id,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Timestamp is the time at which the event occurred.
 | 
			
		||||
	Timestamp time.Time `json:"timestamp,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Action indicates what action encompasses the provided event.
 | 
			
		||||
	Action string `json:"action,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Target uniquely describes the target of the event.
 | 
			
		||||
	Target struct {
 | 
			
		||||
		// Type should be "manifest" or "blob"
 | 
			
		||||
		Type string `json:"type,omitempty"`
 | 
			
		||||
 | 
			
		||||
		// Name identifies the named repository.
 | 
			
		||||
		Name string `json:"name,omitempty"`
 | 
			
		||||
 | 
			
		||||
		// Digest should identify the object in the repository.
 | 
			
		||||
		Digest digest.Digest `json:"digest,omitempty"`
 | 
			
		||||
 | 
			
		||||
		// Tag is present if the operation involved a tagged manifest.
 | 
			
		||||
		Tag string `json:"tag,omitempty"`
 | 
			
		||||
 | 
			
		||||
		// URL provides a link to the content on the relevant repository instance.
 | 
			
		||||
		URL string `json:"url,omitempty"`
 | 
			
		||||
	} `json:"target,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Request covers the request that generated the event.
 | 
			
		||||
	Request RequestRecord `json:"request,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Actor specifies the agent that initiated the event. For most
 | 
			
		||||
	// situations, this could be from the authorizaton context of the request.
 | 
			
		||||
	Actor ActorRecord `json:"actor,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Source identifies the registry node that generated the event. Put
 | 
			
		||||
	// differently, while the actor "initiates" the event, the source
 | 
			
		||||
	// "generates" it.
 | 
			
		||||
	Source SourceRecord `json:"source,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ActorRecord specifies the agent that initiated the event. For most
 | 
			
		||||
// situations, this could be from the authorizaton context of the request.
 | 
			
		||||
// Data in this record can refer to both the initiating client and the
 | 
			
		||||
// generating request.
 | 
			
		||||
type ActorRecord struct {
 | 
			
		||||
	// Name corresponds to the subject or username associated with the
 | 
			
		||||
	// request context that generated the event.
 | 
			
		||||
	Name string `json:"name,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): Look into setting a session cookie to get this
 | 
			
		||||
	// without docker daemon.
 | 
			
		||||
	//    SessionID
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): Push the "Docker-Command" header to replace cookie and
 | 
			
		||||
	// get the actual command.
 | 
			
		||||
	//    Command
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RequestRecord covers the request that generated the event.
 | 
			
		||||
type RequestRecord struct {
 | 
			
		||||
	// ID uniquely identifies the request that initiated the event.
 | 
			
		||||
	ID string `json:"id"`
 | 
			
		||||
 | 
			
		||||
	// Addr contains the ip or hostname and possibly port of the client
 | 
			
		||||
	// connection that initiated the event. This is the RemoteAddr from
 | 
			
		||||
	// the standard http request.
 | 
			
		||||
	Addr string `json:"addr,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Host is the externally accessible host name of the registry instance,
 | 
			
		||||
	// as specified by the http host header on incoming requests.
 | 
			
		||||
	Host string `json:"host,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// Method has the request method that generated the event.
 | 
			
		||||
	Method string `json:"method"`
 | 
			
		||||
 | 
			
		||||
	// UserAgent contains the user agent header of the request.
 | 
			
		||||
	UserAgent string `json:"useragent"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SourceRecord identifies the registry node that generated the event. Put
 | 
			
		||||
// differently, while the actor "initiates" the event, the source "generates"
 | 
			
		||||
// it.
 | 
			
		||||
type SourceRecord struct {
 | 
			
		||||
	// Addr contains the ip or hostname and the port of the registry node
 | 
			
		||||
	// that generated the event. Generally, this will be resolved by
 | 
			
		||||
	// os.Hostname() along with the running port.
 | 
			
		||||
	Addr string `json:"addr,omitempty"`
 | 
			
		||||
 | 
			
		||||
	// InstanceID identifies a running instance of an application. Changes
 | 
			
		||||
	// after each restart.
 | 
			
		||||
	InstanceID string `json:"instanceID,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	// ErrSinkClosed is returned if a write is issued to a sink that has been
 | 
			
		||||
	// closed. If encountered, the error should be considered terminal and
 | 
			
		||||
	// retries will not be successful.
 | 
			
		||||
	ErrSinkClosed = fmt.Errorf("sink: closed")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Sink accepts and sends events.
 | 
			
		||||
type Sink interface {
 | 
			
		||||
	// Write writes one or more events to the sink. If no error is returned,
 | 
			
		||||
	// the caller will assume that all events have been committed and will not
 | 
			
		||||
	// try to send them again. If an error is received, the caller may retry
 | 
			
		||||
	// sending the event. The caller should cede the slice of memory to the
 | 
			
		||||
	// sink and not modify it after calling this method.
 | 
			
		||||
	Write(events ...Event) error
 | 
			
		||||
 | 
			
		||||
	// Close the sink, possibly waiting for pending events to flush.
 | 
			
		||||
	Close() error
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,145 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestEventJSONFormat provides silly test to detect if the event format or
 | 
			
		||||
// envelope has changed. If this code fails, the revision of the protocol may
 | 
			
		||||
// need to be incremented.
 | 
			
		||||
func TestEventEnvelopeJSONFormat(t *testing.T) {
 | 
			
		||||
	var expected = strings.TrimSpace(`
 | 
			
		||||
{
 | 
			
		||||
   "events": [
 | 
			
		||||
      {
 | 
			
		||||
         "id": "asdf-asdf-asdf-asdf-0",
 | 
			
		||||
         "timestamp": "2006-01-02T15:04:05Z",
 | 
			
		||||
         "action": "push",
 | 
			
		||||
         "target": {
 | 
			
		||||
            "type": "manifest",
 | 
			
		||||
            "name": "library/test",
 | 
			
		||||
            "digest": "sha256:0123456789abcdef0",
 | 
			
		||||
            "tag": "latest",
 | 
			
		||||
            "url": "http://example.com/v2/library/test/manifests/latest"
 | 
			
		||||
         },
 | 
			
		||||
         "request": {
 | 
			
		||||
            "id": "asdfasdf",
 | 
			
		||||
            "addr": "client.local",
 | 
			
		||||
            "host": "registrycluster.local",
 | 
			
		||||
            "method": "PUT",
 | 
			
		||||
            "useragent": "test/0.1"
 | 
			
		||||
         },
 | 
			
		||||
         "actor": {
 | 
			
		||||
            "name": "test-actor"
 | 
			
		||||
         },
 | 
			
		||||
         "source": {
 | 
			
		||||
            "addr": "hostname.local:port"
 | 
			
		||||
         }
 | 
			
		||||
      },
 | 
			
		||||
      {
 | 
			
		||||
         "id": "asdf-asdf-asdf-asdf-1",
 | 
			
		||||
         "timestamp": "2006-01-02T15:04:05Z",
 | 
			
		||||
         "action": "push",
 | 
			
		||||
         "target": {
 | 
			
		||||
            "type": "blob",
 | 
			
		||||
            "name": "library/test",
 | 
			
		||||
            "digest": "tarsum.v2+sha256:0123456789abcdef1",
 | 
			
		||||
            "url": "http://example.com/v2/library/test/manifests/latest"
 | 
			
		||||
         },
 | 
			
		||||
         "request": {
 | 
			
		||||
            "id": "asdfasdf",
 | 
			
		||||
            "addr": "client.local",
 | 
			
		||||
            "host": "registrycluster.local",
 | 
			
		||||
            "method": "PUT",
 | 
			
		||||
            "useragent": "test/0.1"
 | 
			
		||||
         },
 | 
			
		||||
         "actor": {
 | 
			
		||||
            "name": "test-actor"
 | 
			
		||||
         },
 | 
			
		||||
         "source": {
 | 
			
		||||
            "addr": "hostname.local:port"
 | 
			
		||||
         }
 | 
			
		||||
      },
 | 
			
		||||
      {
 | 
			
		||||
         "id": "asdf-asdf-asdf-asdf-2",
 | 
			
		||||
         "timestamp": "2006-01-02T15:04:05Z",
 | 
			
		||||
         "action": "push",
 | 
			
		||||
         "target": {
 | 
			
		||||
            "type": "blob",
 | 
			
		||||
            "name": "library/test",
 | 
			
		||||
            "digest": "tarsum.v2+sha256:0123456789abcdef2",
 | 
			
		||||
            "url": "http://example.com/v2/library/test/manifests/latest"
 | 
			
		||||
         },
 | 
			
		||||
         "request": {
 | 
			
		||||
            "id": "asdfasdf",
 | 
			
		||||
            "addr": "client.local",
 | 
			
		||||
            "host": "registrycluster.local",
 | 
			
		||||
            "method": "PUT",
 | 
			
		||||
            "useragent": "test/0.1"
 | 
			
		||||
         },
 | 
			
		||||
         "actor": {
 | 
			
		||||
            "name": "test-actor"
 | 
			
		||||
         },
 | 
			
		||||
         "source": {
 | 
			
		||||
            "addr": "hostname.local:port"
 | 
			
		||||
         }
 | 
			
		||||
      }
 | 
			
		||||
   ]
 | 
			
		||||
}
 | 
			
		||||
	`)
 | 
			
		||||
 | 
			
		||||
	tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5])
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("error creating time: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var prototype Event
 | 
			
		||||
	prototype.Action = "push"
 | 
			
		||||
	prototype.Timestamp = tm
 | 
			
		||||
	prototype.Actor.Name = "test-actor"
 | 
			
		||||
	prototype.Request.ID = "asdfasdf"
 | 
			
		||||
	prototype.Request.Addr = "client.local"
 | 
			
		||||
	prototype.Request.Host = "registrycluster.local"
 | 
			
		||||
	prototype.Request.Method = "PUT"
 | 
			
		||||
	prototype.Request.UserAgent = "test/0.1"
 | 
			
		||||
	prototype.Source.Addr = "hostname.local:port"
 | 
			
		||||
 | 
			
		||||
	var manifestPush Event
 | 
			
		||||
	manifestPush = prototype
 | 
			
		||||
	manifestPush.ID = "asdf-asdf-asdf-asdf-0"
 | 
			
		||||
	manifestPush.Target.Digest = "sha256:0123456789abcdef0"
 | 
			
		||||
	manifestPush.Target.Type = EventTargetTypeManifest
 | 
			
		||||
	manifestPush.Target.Name = "library/test"
 | 
			
		||||
	manifestPush.Target.Tag = "latest"
 | 
			
		||||
	manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest"
 | 
			
		||||
 | 
			
		||||
	var layerPush0 Event
 | 
			
		||||
	layerPush0 = prototype
 | 
			
		||||
	layerPush0.ID = "asdf-asdf-asdf-asdf-1"
 | 
			
		||||
	layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1"
 | 
			
		||||
	layerPush0.Target.Type = EventTargetTypeBlob
 | 
			
		||||
	layerPush0.Target.Name = "library/test"
 | 
			
		||||
	layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest"
 | 
			
		||||
 | 
			
		||||
	var layerPush1 Event
 | 
			
		||||
	layerPush1 = prototype
 | 
			
		||||
	layerPush1.ID = "asdf-asdf-asdf-asdf-2"
 | 
			
		||||
	layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2"
 | 
			
		||||
	layerPush1.Target.Type = EventTargetTypeBlob
 | 
			
		||||
	layerPush1.Target.Name = "library/test"
 | 
			
		||||
	layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest"
 | 
			
		||||
 | 
			
		||||
	var envelope Envelope
 | 
			
		||||
	envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1)
 | 
			
		||||
 | 
			
		||||
	p, err := json.MarshalIndent(envelope, "", "   ")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error marshaling envelope: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if string(p) != expected {
 | 
			
		||||
		t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,145 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// httpSink implements a single-flight, http notification endpoint. This is
 | 
			
		||||
// very lightweight in that it only makes an attempt at an http request.
 | 
			
		||||
// Reliability should be provided by the caller.
 | 
			
		||||
type httpSink struct {
 | 
			
		||||
	url string
 | 
			
		||||
 | 
			
		||||
	mu        sync.Mutex
 | 
			
		||||
	closed    bool
 | 
			
		||||
	client    *http.Client
 | 
			
		||||
	listeners []httpStatusListener
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): Allow one to configure the media type accepted by this
 | 
			
		||||
	// sink and choose the serialization based on that.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
 | 
			
		||||
// sinks for increased reliability.
 | 
			
		||||
func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink {
 | 
			
		||||
	return &httpSink{
 | 
			
		||||
		url:       u,
 | 
			
		||||
		listeners: listeners,
 | 
			
		||||
		client: &http.Client{
 | 
			
		||||
			Transport: &headerRoundTripper{
 | 
			
		||||
				Transport: http.DefaultTransport.(*http.Transport),
 | 
			
		||||
				headers:   headers,
 | 
			
		||||
			},
 | 
			
		||||
			Timeout: timeout,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// httpStatusListener is called on various outcomes of sending notifications.
 | 
			
		||||
type httpStatusListener interface {
 | 
			
		||||
	success(status int, events ...Event)
 | 
			
		||||
	failure(status int, events ...Event)
 | 
			
		||||
	err(err error, events ...Event)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Accept makes an attempt to notify the endpoint, returning an error if it
 | 
			
		||||
// fails. It is the caller's responsibility to retry on error. The events are
 | 
			
		||||
// accepted or rejected as a group.
 | 
			
		||||
func (hs *httpSink) Write(events ...Event) error {
 | 
			
		||||
	hs.mu.Lock()
 | 
			
		||||
	defer hs.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if hs.closed {
 | 
			
		||||
		return ErrSinkClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	envelope := Envelope{
 | 
			
		||||
		Events: events,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
 | 
			
		||||
	// retry but we are going to do it to keep the code simple. It is likely
 | 
			
		||||
	// we could change the event struct to manage its own buffer.
 | 
			
		||||
 | 
			
		||||
	p, err := json.MarshalIndent(envelope, "", "   ")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		for _, listener := range hs.listeners {
 | 
			
		||||
			listener.err(err, events...)
 | 
			
		||||
		}
 | 
			
		||||
		return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	body := bytes.NewReader(p)
 | 
			
		||||
	resp, err := hs.client.Post(hs.url, EventsMediaType, body)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		for _, listener := range hs.listeners {
 | 
			
		||||
			listener.err(err, events...)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return fmt.Errorf("%v: error posting: %v", hs, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// The notifier will treat any 2xx or 3xx response as accepted by the
 | 
			
		||||
	// endpoint.
 | 
			
		||||
	switch {
 | 
			
		||||
	case resp.StatusCode >= 200 && resp.StatusCode < 400:
 | 
			
		||||
		for _, listener := range hs.listeners {
 | 
			
		||||
			listener.success(resp.StatusCode, events...)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// TODO(stevvooe): This is a little accepting: we may want to support
 | 
			
		||||
		// unsupported media type responses with retries using the correct
 | 
			
		||||
		// media type. There may also be cases that will never work.
 | 
			
		||||
 | 
			
		||||
		return nil
 | 
			
		||||
	default:
 | 
			
		||||
		for _, listener := range hs.listeners {
 | 
			
		||||
			listener.failure(resp.StatusCode, events...)
 | 
			
		||||
		}
 | 
			
		||||
		return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close the endpoint
 | 
			
		||||
func (hs *httpSink) Close() error {
 | 
			
		||||
	hs.mu.Lock()
 | 
			
		||||
	defer hs.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if hs.closed {
 | 
			
		||||
		return fmt.Errorf("httpsink: already closed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	hs.closed = true
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hs *httpSink) String() string {
 | 
			
		||||
	return fmt.Sprintf("httpSink{%s}", hs.url)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type headerRoundTripper struct {
 | 
			
		||||
	*http.Transport // must be transport to support CancelRequest
 | 
			
		||||
	headers         http.Header
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
 | 
			
		||||
	var nreq http.Request
 | 
			
		||||
	nreq = *req
 | 
			
		||||
	nreq.Header = make(http.Header)
 | 
			
		||||
 | 
			
		||||
	merge := func(headers http.Header) {
 | 
			
		||||
		for k, v := range headers {
 | 
			
		||||
			nreq.Header[k] = append(nreq.Header[k], v...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	merge(req.Header)
 | 
			
		||||
	merge(hrt.headers)
 | 
			
		||||
 | 
			
		||||
	return hrt.Transport.RoundTrip(&nreq)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,155 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"mime"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestHTTPSink mocks out an http endpoint and notifies it under a couple of
 | 
			
		||||
// conditions, ensuring correct behavior.
 | 
			
		||||
func TestHTTPSink(t *testing.T) {
 | 
			
		||||
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		defer r.Body.Close()
 | 
			
		||||
		if r.Method != "POST" {
 | 
			
		||||
			w.WriteHeader(http.StatusMethodNotAllowed)
 | 
			
		||||
			t.Fatalf("unexpected request method: %v", r.Method)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Extract the content type and make sure it matches
 | 
			
		||||
		contentType := r.Header.Get("Content-Type")
 | 
			
		||||
		mediaType, _, err := mime.ParseMediaType(contentType)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			w.WriteHeader(http.StatusBadRequest)
 | 
			
		||||
			t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if mediaType != EventsMediaType {
 | 
			
		||||
			w.WriteHeader(http.StatusUnsupportedMediaType)
 | 
			
		||||
			t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var envelope Envelope
 | 
			
		||||
		dec := json.NewDecoder(r.Body)
 | 
			
		||||
		if err := dec.Decode(&envelope); err != nil {
 | 
			
		||||
			w.WriteHeader(http.StatusBadRequest)
 | 
			
		||||
			t.Fatalf("error decoding request body: %v", err)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Let caller choose the status
 | 
			
		||||
		status, err := strconv.Atoi(r.FormValue("status"))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Logf("error parsing status: %v", err)
 | 
			
		||||
 | 
			
		||||
			// May just be empty, set status to 200
 | 
			
		||||
			status = http.StatusOK
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		w.WriteHeader(status)
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
	metrics := newSafeMetrics()
 | 
			
		||||
	sink := newHTTPSink(server.URL, 0, nil,
 | 
			
		||||
		&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
 | 
			
		||||
 | 
			
		||||
	var expectedMetrics EndpointMetrics
 | 
			
		||||
	expectedMetrics.Statuses = make(map[string]int)
 | 
			
		||||
 | 
			
		||||
	for _, tc := range []struct {
 | 
			
		||||
		events     []Event // events to send
 | 
			
		||||
		url        string
 | 
			
		||||
		failure    bool // true if there should be a failure.
 | 
			
		||||
		statusCode int  // if not set, no status code should be incremented.
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			statusCode: http.StatusOK,
 | 
			
		||||
			events: []Event{
 | 
			
		||||
				createTestEvent("push", "library/test", "manifest")},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			statusCode: http.StatusOK,
 | 
			
		||||
			events: []Event{
 | 
			
		||||
				createTestEvent("push", "library/test", "manifest"),
 | 
			
		||||
				createTestEvent("push", "library/test", "layer"),
 | 
			
		||||
				createTestEvent("push", "library/test", "layer"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			statusCode: http.StatusTemporaryRedirect,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			statusCode: http.StatusBadRequest,
 | 
			
		||||
			failure:    true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// Case where connection never goes through.
 | 
			
		||||
			url:     "http://shoudlntresolve/",
 | 
			
		||||
			failure: true,
 | 
			
		||||
		},
 | 
			
		||||
	} {
 | 
			
		||||
 | 
			
		||||
		if tc.failure {
 | 
			
		||||
			expectedMetrics.Failures += len(tc.events)
 | 
			
		||||
		} else {
 | 
			
		||||
			expectedMetrics.Successes += len(tc.events)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if tc.statusCode > 0 {
 | 
			
		||||
			expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		url := tc.url
 | 
			
		||||
		if url == "" {
 | 
			
		||||
			url = server.URL + "/"
 | 
			
		||||
		}
 | 
			
		||||
		// setup endpoint to respond with expected status code.
 | 
			
		||||
		url += fmt.Sprintf("?status=%v", tc.statusCode)
 | 
			
		||||
		sink.url = url
 | 
			
		||||
 | 
			
		||||
		t.Logf("testcase: %v, fail=%v", url, tc.failure)
 | 
			
		||||
		// Try a simple event emission.
 | 
			
		||||
		err := sink.Write(tc.events...)
 | 
			
		||||
 | 
			
		||||
		if !tc.failure {
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error send event: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				t.Fatalf("the endpoint should have rejected the request")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) {
 | 
			
		||||
			t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := sink.Close(); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error closing http sink: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// double close returns error
 | 
			
		||||
	if err := sink.Close(); err == nil {
 | 
			
		||||
		t.Fatalf("second close should have returned error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func createTestEvent(action, repo, typ string) Event {
 | 
			
		||||
	event := createEvent(action)
 | 
			
		||||
 | 
			
		||||
	event.Target.Type = typ
 | 
			
		||||
	event.Target.Name = repo
 | 
			
		||||
 | 
			
		||||
	return *event
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,140 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/Sirupsen/logrus"
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/manifest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ManifestListener describes a set of methods for listening to events related to manifests.
 | 
			
		||||
type ManifestListener interface {
 | 
			
		||||
	ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error
 | 
			
		||||
	ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): Please note that delete support is still a little shaky
 | 
			
		||||
	// and we'll need to propagate these in the future.
 | 
			
		||||
 | 
			
		||||
	ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LayerListener describes a listener that can respond to layer related events.
 | 
			
		||||
type LayerListener interface {
 | 
			
		||||
	LayerPushed(repo distribution.Repository, layer distribution.Layer) error
 | 
			
		||||
	LayerPulled(repo distribution.Repository, layer distribution.Layer) error
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): Please note that delete support is still a little shaky
 | 
			
		||||
	// and we'll need to propagate these in the future.
 | 
			
		||||
 | 
			
		||||
	LayerDeleted(repo distribution.Repository, layer distribution.Layer) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Listener combines all repository events into a single interface.
 | 
			
		||||
type Listener interface {
 | 
			
		||||
	ManifestListener
 | 
			
		||||
	LayerListener
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type repositoryListener struct {
 | 
			
		||||
	distribution.Repository
 | 
			
		||||
	listener Listener
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Listen dispatches events on the repository to the listener.
 | 
			
		||||
func Listen(repo distribution.Repository, listener Listener) distribution.Repository {
 | 
			
		||||
	return &repositoryListener{
 | 
			
		||||
		Repository: repo,
 | 
			
		||||
		listener:   listener,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rl *repositoryListener) Manifests() distribution.ManifestService {
 | 
			
		||||
	return &manifestServiceListener{
 | 
			
		||||
		ManifestService: rl.Repository.Manifests(),
 | 
			
		||||
		parent:          rl,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rl *repositoryListener) Layers() distribution.LayerService {
 | 
			
		||||
	return &layerServiceListener{
 | 
			
		||||
		LayerService: rl.Repository.Layers(),
 | 
			
		||||
		parent:       rl,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type manifestServiceListener struct {
 | 
			
		||||
	distribution.ManifestService
 | 
			
		||||
	parent *repositoryListener
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) {
 | 
			
		||||
	sm, err := msl.ManifestService.Get(tag)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil {
 | 
			
		||||
			logrus.Errorf("error dispatching manifest pull to listener: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return sm, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error {
 | 
			
		||||
	err := msl.ManifestService.Put(tag, sm)
 | 
			
		||||
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil {
 | 
			
		||||
			logrus.Errorf("error dispatching manifest push to listener: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type layerServiceListener struct {
 | 
			
		||||
	distribution.LayerService
 | 
			
		||||
	parent *repositoryListener
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (distribution.Layer, error) {
 | 
			
		||||
	layer, err := lsl.LayerService.Fetch(dgst)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil {
 | 
			
		||||
			logrus.Errorf("error dispatching layer pull to listener: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return layer, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lsl *layerServiceListener) Upload() (distribution.LayerUpload, error) {
 | 
			
		||||
	lu, err := lsl.LayerService.Upload()
 | 
			
		||||
	return lsl.decorateUpload(lu), err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lsl *layerServiceListener) Resume(uuid string) (distribution.LayerUpload, error) {
 | 
			
		||||
	lu, err := lsl.LayerService.Resume(uuid)
 | 
			
		||||
	return lsl.decorateUpload(lu), err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lsl *layerServiceListener) decorateUpload(lu distribution.LayerUpload) distribution.LayerUpload {
 | 
			
		||||
	return &layerUploadListener{
 | 
			
		||||
		LayerUpload: lu,
 | 
			
		||||
		parent:      lsl,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type layerUploadListener struct {
 | 
			
		||||
	distribution.LayerUpload
 | 
			
		||||
	parent *layerServiceListener
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lul *layerUploadListener) Finish(dgst digest.Digest) (distribution.Layer, error) {
 | 
			
		||||
	layer, err := lul.LayerUpload.Finish(dgst)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil {
 | 
			
		||||
			logrus.Errorf("error dispatching layer push to listener: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return layer, err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,154 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"io"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/manifest"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
			
		||||
	"github.com/docker/distribution/testutil"
 | 
			
		||||
	"github.com/docker/libtrust"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestListener(t *testing.T) {
 | 
			
		||||
	registry := storage.NewRegistryWithDriver(inmemory.New())
 | 
			
		||||
	tl := &testListener{
 | 
			
		||||
		ops: make(map[string]int),
 | 
			
		||||
	}
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	repository := Listen(registry.Repository(ctx, "foo/bar"), tl)
 | 
			
		||||
 | 
			
		||||
	// Now take the registry through a number of operations
 | 
			
		||||
	checkExerciseRepository(t, repository)
 | 
			
		||||
 | 
			
		||||
	expectedOps := map[string]int{
 | 
			
		||||
		"manifest:push": 1,
 | 
			
		||||
		"manifest:pull": 1,
 | 
			
		||||
		// "manifest:delete": 0, // deletes not supported for now
 | 
			
		||||
		"layer:push": 2,
 | 
			
		||||
		"layer:pull": 2,
 | 
			
		||||
		// "layer:delete":    0, // deletes not supported for now
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !reflect.DeepEqual(tl.ops, expectedOps) {
 | 
			
		||||
		t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type testListener struct {
 | 
			
		||||
	ops map[string]int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tl *testListener) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error {
 | 
			
		||||
	tl.ops["manifest:push"]++
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tl *testListener) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error {
 | 
			
		||||
	tl.ops["manifest:pull"]++
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tl *testListener) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error {
 | 
			
		||||
	tl.ops["manifest:delete"]++
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tl *testListener) LayerPushed(repo distribution.Repository, layer distribution.Layer) error {
 | 
			
		||||
	tl.ops["layer:push"]++
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tl *testListener) LayerPulled(repo distribution.Repository, layer distribution.Layer) error {
 | 
			
		||||
	tl.ops["layer:pull"]++
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tl *testListener) LayerDeleted(repo distribution.Repository, layer distribution.Layer) error {
 | 
			
		||||
	tl.ops["layer:delete"]++
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// checkExerciseRegistry takes the registry through all of its operations,
 | 
			
		||||
// carrying out generic checks.
 | 
			
		||||
func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
 | 
			
		||||
	// TODO(stevvooe): This would be a nice testutil function. Basically, it
 | 
			
		||||
	// takes the registry through a common set of operations. This could be
 | 
			
		||||
	// used to make cross-cutting updates by changing internals that affect
 | 
			
		||||
	// update counts. Basically, it would make writing tests a lot easier.
 | 
			
		||||
 | 
			
		||||
	tag := "thetag"
 | 
			
		||||
	m := manifest.Manifest{
 | 
			
		||||
		Versioned: manifest.Versioned{
 | 
			
		||||
			SchemaVersion: 1,
 | 
			
		||||
		},
 | 
			
		||||
		Name: repository.Name(),
 | 
			
		||||
		Tag:  tag,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	layers := repository.Layers()
 | 
			
		||||
	for i := 0; i < 2; i++ {
 | 
			
		||||
		rs, ds, err := testutil.CreateRandomTarFile()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("error creating test layer: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		dgst := digest.Digest(ds)
 | 
			
		||||
		upload, err := layers.Upload()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("error creating layer upload: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Use the resumes, as well!
 | 
			
		||||
		upload, err = layers.Resume(upload.UUID())
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("error resuming layer upload: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		io.Copy(upload, rs)
 | 
			
		||||
 | 
			
		||||
		if _, err := upload.Finish(dgst); err != nil {
 | 
			
		||||
			t.Fatalf("unexpected error finishing upload: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		m.FSLayers = append(m.FSLayers, manifest.FSLayer{
 | 
			
		||||
			BlobSum: dgst,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		// Then fetch the layers
 | 
			
		||||
		if _, err := layers.Fetch(dgst); err != nil {
 | 
			
		||||
			t.Fatalf("error fetching layer: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pk, err := libtrust.GenerateECP256PrivateKey()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error generating key: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sm, err := manifest.Sign(&m, pk)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error signing manifest: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	manifests := repository.Manifests()
 | 
			
		||||
 | 
			
		||||
	if err := manifests.Put(tag, sm); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error putting the manifest: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fetched, err := manifests.Get(tag)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error fetching manifest: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if fetched.Tag != fetched.Tag {
 | 
			
		||||
		t.Fatalf("retrieved unexpected manifest: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,152 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"expvar"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// EndpointMetrics track various actions taken by the endpoint, typically by
 | 
			
		||||
// number of events. The goal of this to export it via expvar but we may find
 | 
			
		||||
// some other future solution to be better.
 | 
			
		||||
type EndpointMetrics struct {
 | 
			
		||||
	Pending   int            // events pending in queue
 | 
			
		||||
	Events    int            // total events incoming
 | 
			
		||||
	Successes int            // total events written successfully
 | 
			
		||||
	Failures  int            // total events failed
 | 
			
		||||
	Errors    int            // total events errored
 | 
			
		||||
	Statuses  map[string]int // status code histogram, per call event
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// safeMetrics guards the metrics implementation with a lock and provides a
 | 
			
		||||
// safe update function.
 | 
			
		||||
type safeMetrics struct {
 | 
			
		||||
	EndpointMetrics
 | 
			
		||||
	sync.Mutex // protects statuses map
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newSafeMetrics returns safeMetrics with map allocated.
 | 
			
		||||
func newSafeMetrics() *safeMetrics {
 | 
			
		||||
	var sm safeMetrics
 | 
			
		||||
	sm.Statuses = make(map[string]int)
 | 
			
		||||
	return &sm
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// httpStatusListener returns the listener for the http sink that updates the
 | 
			
		||||
// relevent counters.
 | 
			
		||||
func (sm *safeMetrics) httpStatusListener() httpStatusListener {
 | 
			
		||||
	return &endpointMetricsHTTPStatusListener{
 | 
			
		||||
		safeMetrics: sm,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// eventQueueListener returns a listener that maintains queue related counters.
 | 
			
		||||
func (sm *safeMetrics) eventQueueListener() eventQueueListener {
 | 
			
		||||
	return &endpointMetricsEventQueueListener{
 | 
			
		||||
		safeMetrics: sm,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// endpointMetricsHTTPStatusListener increments counters related to http sinks
 | 
			
		||||
// for the relevent events.
 | 
			
		||||
type endpointMetricsHTTPStatusListener struct {
 | 
			
		||||
	*safeMetrics
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
 | 
			
		||||
 | 
			
		||||
func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
 | 
			
		||||
	emsl.safeMetrics.Lock()
 | 
			
		||||
	defer emsl.safeMetrics.Unlock()
 | 
			
		||||
	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 | 
			
		||||
	emsl.Successes += len(events)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
 | 
			
		||||
	emsl.safeMetrics.Lock()
 | 
			
		||||
	defer emsl.safeMetrics.Unlock()
 | 
			
		||||
	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 | 
			
		||||
	emsl.Failures += len(events)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
 | 
			
		||||
	emsl.safeMetrics.Lock()
 | 
			
		||||
	defer emsl.safeMetrics.Unlock()
 | 
			
		||||
	emsl.Errors += len(events)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// endpointMetricsEventQueueListener maintains the incoming events counter and
 | 
			
		||||
// the queues pending count.
 | 
			
		||||
type endpointMetricsEventQueueListener struct {
 | 
			
		||||
	*safeMetrics
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
 | 
			
		||||
	eqc.Lock()
 | 
			
		||||
	defer eqc.Unlock()
 | 
			
		||||
	eqc.Events += len(events)
 | 
			
		||||
	eqc.Pending += len(events)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
 | 
			
		||||
	eqc.Lock()
 | 
			
		||||
	defer eqc.Unlock()
 | 
			
		||||
	eqc.Pending -= len(events)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// endpoints is global registry of endpoints used to report metrics to expvar
 | 
			
		||||
var endpoints struct {
 | 
			
		||||
	registered []*Endpoint
 | 
			
		||||
	mu         sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// register places the endpoint into expvar so that stats are tracked.
 | 
			
		||||
func register(e *Endpoint) {
 | 
			
		||||
	endpoints.mu.Lock()
 | 
			
		||||
	defer endpoints.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	endpoints.registered = append(endpoints.registered, e)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
 | 
			
		||||
	// Ideally, we do more metrics through logging but we need some nice
 | 
			
		||||
	// realtime metrics for queue state for now.
 | 
			
		||||
 | 
			
		||||
	registry := expvar.Get("registry")
 | 
			
		||||
 | 
			
		||||
	if registry == nil {
 | 
			
		||||
		registry = expvar.NewMap("registry")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var notifications expvar.Map
 | 
			
		||||
	notifications.Init()
 | 
			
		||||
	notifications.Set("endpoints", expvar.Func(func() interface{} {
 | 
			
		||||
		endpoints.mu.Lock()
 | 
			
		||||
		defer endpoints.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
		var names []interface{}
 | 
			
		||||
		for _, v := range endpoints.registered {
 | 
			
		||||
			var epjson struct {
 | 
			
		||||
				Name string `json:"name"`
 | 
			
		||||
				URL  string `json:"url"`
 | 
			
		||||
				EndpointConfig
 | 
			
		||||
 | 
			
		||||
				Metrics EndpointMetrics
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			epjson.Name = v.Name()
 | 
			
		||||
			epjson.URL = v.URL()
 | 
			
		||||
			epjson.EndpointConfig = v.EndpointConfig
 | 
			
		||||
 | 
			
		||||
			v.ReadMetrics(&epjson.Metrics)
 | 
			
		||||
 | 
			
		||||
			names = append(names, epjson)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return names
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
	registry.(*expvar.Map).Set("notifications", ¬ifications)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,337 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"container/list"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/Sirupsen/logrus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// NOTE(stevvooe): This file contains definitions for several utility sinks.
 | 
			
		||||
// Typically, the broadcaster is the only sink that should be required
 | 
			
		||||
// externally, but others are suitable for export if the need arises. Albeit,
 | 
			
		||||
// the tight integration with endpoint metrics should be removed.
 | 
			
		||||
 | 
			
		||||
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
 | 
			
		||||
// component is to dispatch events to configured endpoints. Reliability can be
 | 
			
		||||
// provided by wrapping incoming sinks.
 | 
			
		||||
type Broadcaster struct {
 | 
			
		||||
	sinks  []Sink
 | 
			
		||||
	events chan []Event
 | 
			
		||||
	closed chan chan struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewBroadcaster ...
 | 
			
		||||
// Add appends one or more sinks to the list of sinks. The broadcaster
 | 
			
		||||
// behavior will be affected by the properties of the sink. Generally, the
 | 
			
		||||
// sink should accept all messages and deal with reliability on its own. Use
 | 
			
		||||
// of EventQueue and RetryingSink should be used here.
 | 
			
		||||
func NewBroadcaster(sinks ...Sink) *Broadcaster {
 | 
			
		||||
	b := Broadcaster{
 | 
			
		||||
		sinks:  sinks,
 | 
			
		||||
		events: make(chan []Event),
 | 
			
		||||
		closed: make(chan chan struct{}),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Start the broadcaster
 | 
			
		||||
	go b.run()
 | 
			
		||||
 | 
			
		||||
	return &b
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write accepts a block of events to be dispatched to all sinks. This method
 | 
			
		||||
// will never fail and should never block (hopefully!). The caller cedes the
 | 
			
		||||
// slice memory to the broadcaster and should not modify it after calling
 | 
			
		||||
// write.
 | 
			
		||||
func (b *Broadcaster) Write(events ...Event) error {
 | 
			
		||||
	select {
 | 
			
		||||
	case b.events <- events:
 | 
			
		||||
	case <-b.closed:
 | 
			
		||||
		return ErrSinkClosed
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close the broadcaster, ensuring that all messages are flushed to the
 | 
			
		||||
// underlying sink before returning.
 | 
			
		||||
func (b *Broadcaster) Close() error {
 | 
			
		||||
	logrus.Infof("broadcaster: closing")
 | 
			
		||||
	select {
 | 
			
		||||
	case <-b.closed:
 | 
			
		||||
		// already closed
 | 
			
		||||
		return fmt.Errorf("broadcaster: already closed")
 | 
			
		||||
	default:
 | 
			
		||||
		// do a little chan handoff dance to synchronize closing
 | 
			
		||||
		closed := make(chan struct{})
 | 
			
		||||
		b.closed <- closed
 | 
			
		||||
		close(b.closed)
 | 
			
		||||
		<-closed
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// run is the main broadcast loop, started when the broadcaster is created.
 | 
			
		||||
// Under normal conditions, it waits for events on the event channel. After
 | 
			
		||||
// Close is called, this goroutine will exit.
 | 
			
		||||
func (b *Broadcaster) run() {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case block := <-b.events:
 | 
			
		||||
			for _, sink := range b.sinks {
 | 
			
		||||
				if err := sink.Write(block...); err != nil {
 | 
			
		||||
					logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		case closing := <-b.closed:
 | 
			
		||||
 | 
			
		||||
			// close all the underlying sinks
 | 
			
		||||
			for _, sink := range b.sinks {
 | 
			
		||||
				if err := sink.Close(); err != nil {
 | 
			
		||||
					logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			closing <- struct{}{}
 | 
			
		||||
 | 
			
		||||
			logrus.Debugf("broadcaster: closed")
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// eventQueue accepts all messages into a queue for asynchronous consumption
 | 
			
		||||
// by a sink. It is unbounded and thread safe but the sink must be reliable or
 | 
			
		||||
// events will be dropped.
 | 
			
		||||
type eventQueue struct {
 | 
			
		||||
	sink      Sink
 | 
			
		||||
	events    *list.List
 | 
			
		||||
	listeners []eventQueueListener
 | 
			
		||||
	cond      *sync.Cond
 | 
			
		||||
	mu        sync.Mutex
 | 
			
		||||
	closed    bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// eventQueueListener is called when various events happen on the queue.
 | 
			
		||||
type eventQueueListener interface {
 | 
			
		||||
	ingress(events ...Event)
 | 
			
		||||
	egress(events ...Event)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newEventQueue returns a queue to the provided sink. If the updater is non-
 | 
			
		||||
// nil, it will be called to update pending metrics on ingress and egress.
 | 
			
		||||
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
 | 
			
		||||
	eq := eventQueue{
 | 
			
		||||
		sink:      sink,
 | 
			
		||||
		events:    list.New(),
 | 
			
		||||
		listeners: listeners,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	eq.cond = sync.NewCond(&eq.mu)
 | 
			
		||||
	go eq.run()
 | 
			
		||||
	return &eq
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write accepts the events into the queue, only failing if the queue has
 | 
			
		||||
// beend closed.
 | 
			
		||||
func (eq *eventQueue) Write(events ...Event) error {
 | 
			
		||||
	eq.mu.Lock()
 | 
			
		||||
	defer eq.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if eq.closed {
 | 
			
		||||
		return ErrSinkClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, listener := range eq.listeners {
 | 
			
		||||
		listener.ingress(events...)
 | 
			
		||||
	}
 | 
			
		||||
	eq.events.PushBack(events)
 | 
			
		||||
	eq.cond.Signal() // signal waiters
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close shutsdown the event queue, flushing
 | 
			
		||||
func (eq *eventQueue) Close() error {
 | 
			
		||||
	eq.mu.Lock()
 | 
			
		||||
	defer eq.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if eq.closed {
 | 
			
		||||
		return fmt.Errorf("eventqueue: already closed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// set closed flag
 | 
			
		||||
	eq.closed = true
 | 
			
		||||
	eq.cond.Signal() // signal flushes queue
 | 
			
		||||
	eq.cond.Wait()   // wait for signal from last flush
 | 
			
		||||
 | 
			
		||||
	return eq.sink.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// run is the main goroutine to flush events to the target sink.
 | 
			
		||||
func (eq *eventQueue) run() {
 | 
			
		||||
	for {
 | 
			
		||||
		block := eq.next()
 | 
			
		||||
 | 
			
		||||
		if block == nil {
 | 
			
		||||
			return // nil block means event queue is closed.
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := eq.sink.Write(block...); err != nil {
 | 
			
		||||
			logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for _, listener := range eq.listeners {
 | 
			
		||||
			listener.egress(block...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// next encompasses the critical section of the run loop. When the queue is
 | 
			
		||||
// empty, it will block on the condition. If new data arrives, it will wake
 | 
			
		||||
// and return a block. When closed, a nil slice will be returned.
 | 
			
		||||
func (eq *eventQueue) next() []Event {
 | 
			
		||||
	eq.mu.Lock()
 | 
			
		||||
	defer eq.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	for eq.events.Len() < 1 {
 | 
			
		||||
		if eq.closed {
 | 
			
		||||
			eq.cond.Broadcast()
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		eq.cond.Wait()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	front := eq.events.Front()
 | 
			
		||||
	block := front.Value.([]Event)
 | 
			
		||||
	eq.events.Remove(front)
 | 
			
		||||
 | 
			
		||||
	return block
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// retryingSink retries the write until success or an ErrSinkClosed is
 | 
			
		||||
// returned. Underlying sink must have p > 0 of succeeding or the sink will
 | 
			
		||||
// block. Internally, it is a circuit breaker retries to manage reset.
 | 
			
		||||
// Concurrent calls to a retrying sink are serialized through the sink,
 | 
			
		||||
// meaning that if one is in-flight, another will not proceed.
 | 
			
		||||
type retryingSink struct {
 | 
			
		||||
	mu     sync.Mutex
 | 
			
		||||
	sink   Sink
 | 
			
		||||
	closed bool
 | 
			
		||||
 | 
			
		||||
	// circuit breaker hueristics
 | 
			
		||||
	failures struct {
 | 
			
		||||
		threshold int
 | 
			
		||||
		recent    int
 | 
			
		||||
		last      time.Time
 | 
			
		||||
		backoff   time.Duration // time after which we retry after failure.
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type retryingSinkListener interface {
 | 
			
		||||
	active(events ...Event)
 | 
			
		||||
	retry(events ...Event)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO(stevvooe): We are using circuit break here, which actually doesn't
 | 
			
		||||
// make a whole lot of sense for this use case, since we always retry. Move
 | 
			
		||||
// this to use bounded exponential backoff.
 | 
			
		||||
 | 
			
		||||
// newRetryingSink returns a sink that will retry writes to a sink, backing
 | 
			
		||||
// off on failure. Parameters threshold and backoff adjust the behavior of the
 | 
			
		||||
// circuit breaker.
 | 
			
		||||
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
 | 
			
		||||
	rs := &retryingSink{
 | 
			
		||||
		sink: sink,
 | 
			
		||||
	}
 | 
			
		||||
	rs.failures.threshold = threshold
 | 
			
		||||
	rs.failures.backoff = backoff
 | 
			
		||||
 | 
			
		||||
	return rs
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write attempts to flush the events to the downstream sink until it succeeds
 | 
			
		||||
// or the sink is closed.
 | 
			
		||||
func (rs *retryingSink) Write(events ...Event) error {
 | 
			
		||||
	rs.mu.Lock()
 | 
			
		||||
	defer rs.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
retry:
 | 
			
		||||
 | 
			
		||||
	if rs.closed {
 | 
			
		||||
		return ErrSinkClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !rs.proceed() {
 | 
			
		||||
		logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
 | 
			
		||||
		rs.wait(rs.failures.backoff)
 | 
			
		||||
		goto retry
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := rs.write(events...); err != nil {
 | 
			
		||||
		if err == ErrSinkClosed {
 | 
			
		||||
			// terminal!
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
 | 
			
		||||
		goto retry
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close closes the sink and the underlying sink.
 | 
			
		||||
func (rs *retryingSink) Close() error {
 | 
			
		||||
	rs.mu.Lock()
 | 
			
		||||
	defer rs.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if rs.closed {
 | 
			
		||||
		return fmt.Errorf("retryingsink: already closed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rs.closed = true
 | 
			
		||||
	return rs.sink.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// write provides a helper that dispatches failure and success properly. Used
 | 
			
		||||
// by write as the single-flight write call.
 | 
			
		||||
func (rs *retryingSink) write(events ...Event) error {
 | 
			
		||||
	if err := rs.sink.Write(events...); err != nil {
 | 
			
		||||
		rs.failure()
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rs.reset()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// wait backoff time against the sink, unlocking so others can proceed. Should
 | 
			
		||||
// only be called by methods that currently have the mutex.
 | 
			
		||||
func (rs *retryingSink) wait(backoff time.Duration) {
 | 
			
		||||
	rs.mu.Unlock()
 | 
			
		||||
	defer rs.mu.Lock()
 | 
			
		||||
 | 
			
		||||
	// backoff here
 | 
			
		||||
	time.Sleep(backoff)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// reset marks a succesful call.
 | 
			
		||||
func (rs *retryingSink) reset() {
 | 
			
		||||
	rs.failures.recent = 0
 | 
			
		||||
	rs.failures.last = time.Time{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// failure records a failure.
 | 
			
		||||
func (rs *retryingSink) failure() {
 | 
			
		||||
	rs.failures.recent++
 | 
			
		||||
	rs.failures.last = time.Now().UTC()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// proceed returns true if the call should proceed based on circuit breaker
 | 
			
		||||
// hueristics.
 | 
			
		||||
func (rs *retryingSink) proceed() bool {
 | 
			
		||||
	return rs.failures.recent < rs.failures.threshold ||
 | 
			
		||||
		time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,223 +0,0 @@
 | 
			
		|||
package notifications
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/Sirupsen/logrus"
 | 
			
		||||
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestBroadcaster(t *testing.T) {
 | 
			
		||||
	const nEvents = 1000
 | 
			
		||||
	var sinks []Sink
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 10; i++ {
 | 
			
		||||
		sinks = append(sinks, &testSink{})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b := NewBroadcaster(sinks...)
 | 
			
		||||
 | 
			
		||||
	var block []Event
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	for i := 1; i <= nEvents; i++ {
 | 
			
		||||
		block = append(block, createTestEvent("push", "library/test", "blob"))
 | 
			
		||||
 | 
			
		||||
		if i%10 == 0 && i > 0 {
 | 
			
		||||
			wg.Add(1)
 | 
			
		||||
			go func(block ...Event) {
 | 
			
		||||
				if err := b.Write(block...); err != nil {
 | 
			
		||||
					t.Fatalf("error writing block of length %d: %v", len(block), err)
 | 
			
		||||
				}
 | 
			
		||||
				wg.Done()
 | 
			
		||||
			}(block...)
 | 
			
		||||
 | 
			
		||||
			block = nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Wait() // Wait until writes complete
 | 
			
		||||
	checkClose(t, b)
 | 
			
		||||
 | 
			
		||||
	// Iterate through the sinks and check that they all have the expected length.
 | 
			
		||||
	for _, sink := range sinks {
 | 
			
		||||
		ts := sink.(*testSink)
 | 
			
		||||
		ts.mu.Lock()
 | 
			
		||||
		defer ts.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
		if len(ts.events) != nEvents {
 | 
			
		||||
			t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if !ts.closed {
 | 
			
		||||
			t.Fatalf("sink should have been closed")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestEventQueue(t *testing.T) {
 | 
			
		||||
	const nevents = 1000
 | 
			
		||||
	var ts testSink
 | 
			
		||||
	metrics := newSafeMetrics()
 | 
			
		||||
	eq := newEventQueue(
 | 
			
		||||
		// delayed sync simulates destination slower than channel comms
 | 
			
		||||
		&delayedSink{
 | 
			
		||||
			Sink:  &ts,
 | 
			
		||||
			delay: time.Millisecond * 1,
 | 
			
		||||
		}, metrics.eventQueueListener())
 | 
			
		||||
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	var block []Event
 | 
			
		||||
	for i := 1; i <= nevents; i++ {
 | 
			
		||||
		block = append(block, createTestEvent("push", "library/test", "blob"))
 | 
			
		||||
		if i%10 == 0 && i > 0 {
 | 
			
		||||
			wg.Add(1)
 | 
			
		||||
			go func(block ...Event) {
 | 
			
		||||
				if err := eq.Write(block...); err != nil {
 | 
			
		||||
					t.Fatalf("error writing event block: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
				wg.Done()
 | 
			
		||||
			}(block...)
 | 
			
		||||
 | 
			
		||||
			block = nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	checkClose(t, eq)
 | 
			
		||||
 | 
			
		||||
	ts.mu.Lock()
 | 
			
		||||
	defer ts.mu.Unlock()
 | 
			
		||||
	metrics.Lock()
 | 
			
		||||
	defer metrics.Unlock()
 | 
			
		||||
 | 
			
		||||
	if len(ts.events) != nevents {
 | 
			
		||||
		t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !ts.closed {
 | 
			
		||||
		t.Fatalf("sink should have been closed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if metrics.Events != nevents {
 | 
			
		||||
		t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if metrics.Pending != 0 {
 | 
			
		||||
		t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRetryingSink(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	// Make a sync that fails most of the time, ensuring that all the events
 | 
			
		||||
	// make it through.
 | 
			
		||||
	var ts testSink
 | 
			
		||||
	flaky := &flakySink{
 | 
			
		||||
		rate: 1.0, // start out always failing.
 | 
			
		||||
		Sink: &ts,
 | 
			
		||||
	}
 | 
			
		||||
	s := newRetryingSink(flaky, 3, 10*time.Millisecond)
 | 
			
		||||
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	var block []Event
 | 
			
		||||
	for i := 1; i <= 100; i++ {
 | 
			
		||||
		block = append(block, createTestEvent("push", "library/test", "blob"))
 | 
			
		||||
 | 
			
		||||
		// Above 50, set the failure rate lower
 | 
			
		||||
		if i > 50 {
 | 
			
		||||
			s.mu.Lock()
 | 
			
		||||
			flaky.rate = 0.90
 | 
			
		||||
			s.mu.Unlock()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if i%10 == 0 && i > 0 {
 | 
			
		||||
			wg.Add(1)
 | 
			
		||||
			go func(block ...Event) {
 | 
			
		||||
				defer wg.Done()
 | 
			
		||||
				if err := s.Write(block...); err != nil {
 | 
			
		||||
					t.Fatalf("error writing event block: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
			}(block...)
 | 
			
		||||
 | 
			
		||||
			block = nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	checkClose(t, s)
 | 
			
		||||
 | 
			
		||||
	ts.mu.Lock()
 | 
			
		||||
	defer ts.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if len(ts.events) != 100 {
 | 
			
		||||
		t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type testSink struct {
 | 
			
		||||
	events []Event
 | 
			
		||||
	mu     sync.Mutex
 | 
			
		||||
	closed bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ts *testSink) Write(events ...Event) error {
 | 
			
		||||
	ts.mu.Lock()
 | 
			
		||||
	defer ts.mu.Unlock()
 | 
			
		||||
	ts.events = append(ts.events, events...)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ts *testSink) Close() error {
 | 
			
		||||
	ts.mu.Lock()
 | 
			
		||||
	defer ts.mu.Unlock()
 | 
			
		||||
	ts.closed = true
 | 
			
		||||
 | 
			
		||||
	logrus.Infof("closing testSink")
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type delayedSink struct {
 | 
			
		||||
	Sink
 | 
			
		||||
	delay time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ds *delayedSink) Write(events ...Event) error {
 | 
			
		||||
	time.Sleep(ds.delay)
 | 
			
		||||
	return ds.Sink.Write(events...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type flakySink struct {
 | 
			
		||||
	Sink
 | 
			
		||||
	rate float64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (fs *flakySink) Write(events ...Event) error {
 | 
			
		||||
	if rand.Float64() < fs.rate {
 | 
			
		||||
		return fmt.Errorf("error writing %d events", len(events))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return fs.Sink.Write(events...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func checkClose(t *testing.T, sink Sink) {
 | 
			
		||||
	if err := sink.Close(); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error closing: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// second close should not crash but should return an error.
 | 
			
		||||
	if err := sink.Close(); err == nil {
 | 
			
		||||
		t.Fatalf("no error on double close")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Write after closed should be an error
 | 
			
		||||
	if err := sink.Write([]Event{}...); err == nil {
 | 
			
		||||
		t.Fatalf("write after closed did not have an error")
 | 
			
		||||
	} else if err != ErrSinkClosed {
 | 
			
		||||
		t.Fatalf("error should be ErrSinkClosed")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue