Merge pull request #1957 from nwt/notification-filtering
Add notification filtering by target media typemaster
						commit
						cfad4321c1
					
				|  | @ -527,13 +527,14 @@ type Notifications struct { | |||
| // Endpoint describes the configuration of an http webhook notification
 | ||||
| // endpoint.
 | ||||
| type Endpoint struct { | ||||
| 	Name      string        `yaml:"name"`      // identifies the endpoint in the registry instance.
 | ||||
| 	Disabled  bool          `yaml:"disabled"`  // disables the endpoint
 | ||||
| 	URL       string        `yaml:"url"`       // post url for the endpoint.
 | ||||
| 	Headers   http.Header   `yaml:"headers"`   // static headers that should be added to all requests
 | ||||
| 	Timeout   time.Duration `yaml:"timeout"`   // HTTP timeout
 | ||||
| 	Threshold int           `yaml:"threshold"` // circuit breaker threshold before backing off on failure
 | ||||
| 	Backoff   time.Duration `yaml:"backoff"`   // backoff duration
 | ||||
| 	Name              string        `yaml:"name"`              // identifies the endpoint in the registry instance.
 | ||||
| 	Disabled          bool          `yaml:"disabled"`          // disables the endpoint
 | ||||
| 	URL               string        `yaml:"url"`               // post url for the endpoint.
 | ||||
| 	Headers           http.Header   `yaml:"headers"`           // static headers that should be added to all requests
 | ||||
| 	Timeout           time.Duration `yaml:"timeout"`           // HTTP timeout
 | ||||
| 	Threshold         int           `yaml:"threshold"`         // circuit breaker threshold before backing off on failure
 | ||||
| 	Backoff           time.Duration `yaml:"backoff"`           // backoff duration
 | ||||
| 	IgnoredMediaTypes []string      `yaml:"ignoredmediatypes"` // target media types to ignore
 | ||||
| } | ||||
| 
 | ||||
| // Reporting defines error reporting methods.
 | ||||
|  |  | |||
|  | @ -62,6 +62,7 @@ var configStruct = Configuration{ | |||
| 				Headers: http.Header{ | ||||
| 					"Authorization": []string{"Bearer <example>"}, | ||||
| 				}, | ||||
| 				IgnoredMediaTypes: []string{"application/octet-stream"}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	}, | ||||
|  | @ -139,6 +140,8 @@ notifications: | |||
|       url:  http://example.com
 | ||||
|       headers: | ||||
|         Authorization: [Bearer <example>] | ||||
|       ignoredmediatypes: | ||||
|         - application/octet-stream | ||||
| reporting: | ||||
|   bugsnag: | ||||
|     apikey: BugsnagApiKey | ||||
|  | @ -165,6 +168,8 @@ notifications: | |||
|       url:  http://example.com
 | ||||
|       headers: | ||||
|         Authorization: [Bearer <example>] | ||||
|       ignoredmediatypes: | ||||
|         - application/octet-stream | ||||
| http: | ||||
|   headers: | ||||
|     X-Content-Type-Options: [nosniff] | ||||
|  |  | |||
|  | @ -214,6 +214,8 @@ information about each option that appears later in this page. | |||
|           timeout: 500 | ||||
|           threshold: 5 | ||||
|           backoff: 1000 | ||||
|           ignoredmediatypes: | ||||
|             - application/octet-stream | ||||
|     redis: | ||||
|       addr: localhost:6379 | ||||
|       password: asecret | ||||
|  | @ -1177,6 +1179,8 @@ settings for the registry. | |||
|           timeout: 500 | ||||
|           threshold: 5 | ||||
|           backoff: 1000 | ||||
|           ignoredmediatypes: | ||||
|             - application/octet-stream | ||||
| 
 | ||||
| The notifications option is **optional** and currently may contain a single | ||||
| option, `endpoints`. | ||||
|  | @ -1291,6 +1295,18 @@ The URL to which events should be published. | |||
|     If you omit the suffix, the system interprets the value as nanoseconds. | ||||
|     </td> | ||||
|   </tr> | ||||
|   <tr> | ||||
|     <td> | ||||
|       <code>ignoredmediatypes</code> | ||||
|     </td> | ||||
|     <td> | ||||
|       no | ||||
|     </td> | ||||
|     <td> | ||||
|       List of target media types to ignore. An event whose target media type | ||||
|       is present in this list will not be published to the endpoint. | ||||
|     </td> | ||||
|   </tr> | ||||
| </table> | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -8,11 +8,12 @@ import ( | |||
| // EndpointConfig covers the optional configuration parameters for an active
 | ||||
| // endpoint.
 | ||||
| type EndpointConfig struct { | ||||
| 	Headers   http.Header | ||||
| 	Timeout   time.Duration | ||||
| 	Threshold int | ||||
| 	Backoff   time.Duration | ||||
| 	Transport *http.Transport | ||||
| 	Headers           http.Header | ||||
| 	Timeout           time.Duration | ||||
| 	Threshold         int | ||||
| 	Backoff           time.Duration | ||||
| 	IgnoredMediaTypes []string | ||||
| 	Transport         *http.Transport | ||||
| } | ||||
| 
 | ||||
| // defaults set any zero-valued fields to a reasonable default.
 | ||||
|  | @ -62,6 +63,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { | |||
| 		endpoint.Transport, endpoint.metrics.httpStatusListener()) | ||||
| 	endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) | ||||
| 	endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) | ||||
| 	endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes) | ||||
| 
 | ||||
| 	register(&endpoint) | ||||
| 	return &endpoint | ||||
|  |  | |||
|  | @ -210,6 +210,44 @@ func (eq *eventQueue) next() []Event { | |||
| 	return block | ||||
| } | ||||
| 
 | ||||
| // ignoredMediaTypesSink discards events with ignored target media types and
 | ||||
| // passes the rest along.
 | ||||
| type ignoredMediaTypesSink struct { | ||||
| 	Sink | ||||
| 	ignored map[string]bool | ||||
| } | ||||
| 
 | ||||
| func newIgnoredMediaTypesSink(sink Sink, ignored []string) Sink { | ||||
| 	if len(ignored) == 0 { | ||||
| 		return sink | ||||
| 	} | ||||
| 
 | ||||
| 	ignoredMap := make(map[string]bool) | ||||
| 	for _, mediaType := range ignored { | ||||
| 		ignoredMap[mediaType] = true | ||||
| 	} | ||||
| 
 | ||||
| 	return &ignoredMediaTypesSink{ | ||||
| 		Sink:    sink, | ||||
| 		ignored: ignoredMap, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Write discards events with ignored target media types and passes the rest
 | ||||
| // along.
 | ||||
| func (imts *ignoredMediaTypesSink) Write(events ...Event) error { | ||||
| 	var kept []Event | ||||
| 	for _, e := range events { | ||||
| 		if !imts.ignored[e.Target.MediaType] { | ||||
| 			kept = append(kept, e) | ||||
| 		} | ||||
| 	} | ||||
| 	if len(kept) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return imts.Sink.Write(kept...) | ||||
| } | ||||
| 
 | ||||
| // retryingSink retries the write until success or an ErrSinkClosed is
 | ||||
| // returned. Underlying sink must have p > 0 of succeeding or the sink will
 | ||||
| // block. Internally, it is a circuit breaker retries to manage reset.
 | ||||
|  |  | |||
|  | @ -3,6 +3,7 @@ package notifications | |||
| import ( | ||||
| 	"fmt" | ||||
| 	"math/rand" | ||||
| 	"reflect" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
|  | @ -112,6 +113,38 @@ func TestEventQueue(t *testing.T) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestIgnoredMediaTypesSink(t *testing.T) { | ||||
| 	blob := createTestEvent("push", "library/test", "blob") | ||||
| 	manifest := createTestEvent("push", "library/test", "manifest") | ||||
| 
 | ||||
| 	type testcase struct { | ||||
| 		ignored  []string | ||||
| 		expected []Event | ||||
| 	} | ||||
| 
 | ||||
| 	cases := []testcase{ | ||||
| 		{nil, []Event{blob, manifest}}, | ||||
| 		{[]string{"other"}, []Event{blob, manifest}}, | ||||
| 		{[]string{"blob"}, []Event{manifest}}, | ||||
| 		{[]string{"blob", "manifest"}, nil}, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, c := range cases { | ||||
| 		ts := &testSink{} | ||||
| 		s := newIgnoredMediaTypesSink(ts, c.ignored) | ||||
| 
 | ||||
| 		if err := s.Write(blob, manifest); err != nil { | ||||
| 			t.Fatalf("error writing event: %v", err) | ||||
| 		} | ||||
| 
 | ||||
| 		ts.mu.Lock() | ||||
| 		if !reflect.DeepEqual(ts.events, c.expected) { | ||||
| 			t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected) | ||||
| 		} | ||||
| 		ts.mu.Unlock() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestRetryingSink(t *testing.T) { | ||||
| 
 | ||||
| 	// Make a sync that fails most of the time, ensuring that all the events
 | ||||
|  |  | |||
|  | @ -427,10 +427,11 @@ func (app *App) configureEvents(configuration *configuration.Configuration) { | |||
| 
 | ||||
| 		ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) | ||||
| 		endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ | ||||
| 			Timeout:   endpoint.Timeout, | ||||
| 			Threshold: endpoint.Threshold, | ||||
| 			Backoff:   endpoint.Backoff, | ||||
| 			Headers:   endpoint.Headers, | ||||
| 			Timeout:           endpoint.Timeout, | ||||
| 			Threshold:         endpoint.Threshold, | ||||
| 			Backoff:           endpoint.Backoff, | ||||
| 			Headers:           endpoint.Headers, | ||||
| 			IgnoredMediaTypes: endpoint.IgnoredMediaTypes, | ||||
| 		}) | ||||
| 
 | ||||
| 		sinks = append(sinks, endpoint) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue