94 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			94 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
package notifications
 | 
						|
 | 
						|
import (
 | 
						|
	"net/http"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
// EndpointConfig covers the optional configuration parameters for an active
 | 
						|
// endpoint.
 | 
						|
type EndpointConfig struct {
 | 
						|
	Headers           http.Header
 | 
						|
	Timeout           time.Duration
 | 
						|
	Threshold         int
 | 
						|
	Backoff           time.Duration
 | 
						|
	IgnoredMediaTypes []string
 | 
						|
	Transport         *http.Transport
 | 
						|
}
 | 
						|
 | 
						|
// defaults set any zero-valued fields to a reasonable default.
 | 
						|
func (ec *EndpointConfig) defaults() {
 | 
						|
	if ec.Timeout <= 0 {
 | 
						|
		ec.Timeout = time.Second
 | 
						|
	}
 | 
						|
 | 
						|
	if ec.Threshold <= 0 {
 | 
						|
		ec.Threshold = 10
 | 
						|
	}
 | 
						|
 | 
						|
	if ec.Backoff <= 0 {
 | 
						|
		ec.Backoff = time.Second
 | 
						|
	}
 | 
						|
 | 
						|
	if ec.Transport == nil {
 | 
						|
		ec.Transport = http.DefaultTransport.(*http.Transport)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Endpoint is a reliable, queued, thread-safe sink that notify external http
 | 
						|
// services when events are written. Writes are non-blocking and always
 | 
						|
// succeed for callers but events may be queued internally.
 | 
						|
type Endpoint struct {
 | 
						|
	Sink
 | 
						|
	url  string
 | 
						|
	name string
 | 
						|
 | 
						|
	EndpointConfig
 | 
						|
 | 
						|
	metrics *safeMetrics
 | 
						|
}
 | 
						|
 | 
						|
// NewEndpoint returns a running endpoint, ready to receive events.
 | 
						|
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
 | 
						|
	var endpoint Endpoint
 | 
						|
	endpoint.name = name
 | 
						|
	endpoint.url = url
 | 
						|
	endpoint.EndpointConfig = config
 | 
						|
	endpoint.defaults()
 | 
						|
	endpoint.metrics = newSafeMetrics()
 | 
						|
 | 
						|
	// Configures the inmemory queue, retry, http pipeline.
 | 
						|
	endpoint.Sink = newHTTPSink(
 | 
						|
		endpoint.url, endpoint.Timeout, endpoint.Headers,
 | 
						|
		endpoint.Transport, endpoint.metrics.httpStatusListener())
 | 
						|
	endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
 | 
						|
	endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
 | 
						|
	endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes)
 | 
						|
 | 
						|
	register(&endpoint)
 | 
						|
	return &endpoint
 | 
						|
}
 | 
						|
 | 
						|
// Name returns the name of the endpoint, generally used for debugging.
 | 
						|
func (e *Endpoint) Name() string {
 | 
						|
	return e.name
 | 
						|
}
 | 
						|
 | 
						|
// URL returns the url of the endpoint.
 | 
						|
func (e *Endpoint) URL() string {
 | 
						|
	return e.url
 | 
						|
}
 | 
						|
 | 
						|
// ReadMetrics populates em with metrics from the endpoint.
 | 
						|
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
 | 
						|
	e.metrics.Lock()
 | 
						|
	defer e.metrics.Unlock()
 | 
						|
 | 
						|
	*em = e.metrics.EndpointMetrics
 | 
						|
	// Map still need to copied in a threadsafe manner.
 | 
						|
	em.Statuses = make(map[string]int)
 | 
						|
	for k, v := range e.metrics.Statuses {
 | 
						|
		em.Statuses[k] = v
 | 
						|
	}
 | 
						|
}
 |