Merge pull request #901 from aaronlehmann/configurable-health-checks
Add configurable file-existence and HTTP health checksmaster
						commit
						f639a1af8c
					
				| 
						 | 
				
			
			@ -5,8 +5,8 @@ import (
 | 
			
		|||
	"net/http/httptest"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/registry/auth"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestSillyAccessController(t *testing.T) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,9 +15,9 @@ import (
 | 
			
		|||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/registry/auth"
 | 
			
		||||
	"github.com/docker/libtrust"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func makeRootKeys(numKeys int) ([]libtrust.PrivateKey, error) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,6 +19,7 @@ import (
 | 
			
		|||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/configuration"
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/manifest"
 | 
			
		||||
	"github.com/docker/distribution/registry/api/errcode"
 | 
			
		||||
| 
						 | 
				
			
			@ -27,7 +28,6 @@ import (
 | 
			
		|||
	"github.com/docker/distribution/testutil"
 | 
			
		||||
	"github.com/docker/libtrust"
 | 
			
		||||
	"github.com/gorilla/handlers"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var headerConfig = http.Header{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,6 +15,7 @@ import (
 | 
			
		|||
	"github.com/docker/distribution/configuration"
 | 
			
		||||
	ctxu "github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/health"
 | 
			
		||||
	"github.com/docker/distribution/health/checks"
 | 
			
		||||
	"github.com/docker/distribution/notifications"
 | 
			
		||||
	"github.com/docker/distribution/registry/api/errcode"
 | 
			
		||||
	"github.com/docker/distribution/registry/api/v2"
 | 
			
		||||
| 
						 | 
				
			
			@ -37,6 +38,9 @@ import (
 | 
			
		|||
// was specified.
 | 
			
		||||
const randomSecretSize = 32
 | 
			
		||||
 | 
			
		||||
// defaultCheckInterval is the default time in between health checks
 | 
			
		||||
const defaultCheckInterval = 10 * time.Second
 | 
			
		||||
 | 
			
		||||
// App is a global registry application object. Shared resources can be placed
 | 
			
		||||
// on this object that will be accessible from all requests. Any writable
 | 
			
		||||
// fields should be protected.
 | 
			
		||||
| 
						 | 
				
			
			@ -230,11 +234,80 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
 | 
			
		|||
// process. Because the configuration and app are tightly coupled,
 | 
			
		||||
// implementing this properly will require a refactor. This method may panic
 | 
			
		||||
// if called twice in the same process.
 | 
			
		||||
func (app *App) RegisterHealthChecks() {
 | 
			
		||||
	health.RegisterPeriodicThresholdFunc("storagedriver_"+app.Config.Storage.Type(), 10*time.Second, 3, func() error {
 | 
			
		||||
		_, err := app.driver.List(app, "/") // "/" should always exist
 | 
			
		||||
		return err                          // any error will be treated as failure
 | 
			
		||||
	})
 | 
			
		||||
func (app *App) RegisterHealthChecks(healthRegistries ...*health.Registry) {
 | 
			
		||||
	if len(healthRegistries) > 1 {
 | 
			
		||||
		panic("RegisterHealthChecks called with more than one registry")
 | 
			
		||||
	}
 | 
			
		||||
	healthRegistry := health.DefaultRegistry
 | 
			
		||||
	if len(healthRegistries) == 1 {
 | 
			
		||||
		healthRegistry = healthRegistries[0]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if app.Config.Health.StorageDriver.Enabled {
 | 
			
		||||
		interval := app.Config.Health.StorageDriver.Interval
 | 
			
		||||
		if interval == 0 {
 | 
			
		||||
			interval = defaultCheckInterval
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		storageDriverCheck := func() error {
 | 
			
		||||
			_, err := app.driver.List(app, "/") // "/" should always exist
 | 
			
		||||
			return err                          // any error will be treated as failure
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if app.Config.Health.StorageDriver.Threshold != 0 {
 | 
			
		||||
			healthRegistry.RegisterPeriodicThresholdFunc("storagedriver_"+app.Config.Storage.Type(), interval, app.Config.Health.StorageDriver.Threshold, storageDriverCheck)
 | 
			
		||||
		} else {
 | 
			
		||||
			healthRegistry.RegisterPeriodicFunc("storagedriver_"+app.Config.Storage.Type(), interval, storageDriverCheck)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, fileChecker := range app.Config.Health.FileCheckers {
 | 
			
		||||
		interval := fileChecker.Interval
 | 
			
		||||
		if interval == 0 {
 | 
			
		||||
			interval = defaultCheckInterval
 | 
			
		||||
		}
 | 
			
		||||
		ctxu.GetLogger(app).Infof("configuring file health check path=%s, interval=%d", fileChecker.File, interval/time.Second)
 | 
			
		||||
		healthRegistry.Register(fileChecker.File, health.PeriodicChecker(checks.FileChecker(fileChecker.File), interval))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, httpChecker := range app.Config.Health.HTTPCheckers {
 | 
			
		||||
		interval := httpChecker.Interval
 | 
			
		||||
		if interval == 0 {
 | 
			
		||||
			interval = defaultCheckInterval
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		statusCode := httpChecker.StatusCode
 | 
			
		||||
		if statusCode == 0 {
 | 
			
		||||
			statusCode = 200
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		checker := checks.HTTPChecker(httpChecker.URI, statusCode, httpChecker.Timeout, httpChecker.Headers)
 | 
			
		||||
 | 
			
		||||
		if httpChecker.Threshold != 0 {
 | 
			
		||||
			ctxu.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d, threshold=%d", httpChecker.URI, interval/time.Second, httpChecker.Threshold)
 | 
			
		||||
			healthRegistry.Register(httpChecker.URI, health.PeriodicThresholdChecker(checker, interval, httpChecker.Threshold))
 | 
			
		||||
		} else {
 | 
			
		||||
			ctxu.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d", httpChecker.URI, interval/time.Second)
 | 
			
		||||
			healthRegistry.Register(httpChecker.URI, health.PeriodicChecker(checker, interval))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tcpChecker := range app.Config.Health.TCPCheckers {
 | 
			
		||||
		interval := tcpChecker.Interval
 | 
			
		||||
		if interval == 0 {
 | 
			
		||||
			interval = defaultCheckInterval
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		checker := checks.TCPChecker(tcpChecker.Addr, tcpChecker.Timeout)
 | 
			
		||||
 | 
			
		||||
		if tcpChecker.Threshold != 0 {
 | 
			
		||||
			ctxu.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d, threshold=%d", tcpChecker.Addr, interval/time.Second, tcpChecker.Threshold)
 | 
			
		||||
			healthRegistry.Register(tcpChecker.Addr, health.PeriodicThresholdChecker(checker, interval, tcpChecker.Threshold))
 | 
			
		||||
		} else {
 | 
			
		||||
			ctxu.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d", tcpChecker.Addr, interval/time.Second)
 | 
			
		||||
			healthRegistry.Register(tcpChecker.Addr, health.PeriodicChecker(checker, interval))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// register a handler with the application, by route name. The handler will be
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -9,6 +9,7 @@ import (
 | 
			
		|||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/configuration"
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/registry/api/errcode"
 | 
			
		||||
	"github.com/docker/distribution/registry/api/v2"
 | 
			
		||||
	"github.com/docker/distribution/registry/auth"
 | 
			
		||||
| 
						 | 
				
			
			@ -16,7 +17,6 @@ import (
 | 
			
		|||
	"github.com/docker/distribution/registry/storage"
 | 
			
		||||
	memorycache "github.com/docker/distribution/registry/storage/cache/memory"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestAppDispatcher builds an application with a test dispatcher and ensures
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,201 @@
 | 
			
		|||
package handlers
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"os"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/configuration"
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/health"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestFileHealthCheck(t *testing.T) {
 | 
			
		||||
	interval := time.Second
 | 
			
		||||
 | 
			
		||||
	tmpfile, err := ioutil.TempFile(os.TempDir(), "healthcheck")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("could not create temporary file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer tmpfile.Close()
 | 
			
		||||
 | 
			
		||||
	config := configuration.Configuration{
 | 
			
		||||
		Storage: configuration.Storage{
 | 
			
		||||
			"inmemory": configuration.Parameters{},
 | 
			
		||||
		},
 | 
			
		||||
		Health: configuration.Health{
 | 
			
		||||
			FileCheckers: []configuration.FileChecker{
 | 
			
		||||
				{
 | 
			
		||||
					Interval: interval,
 | 
			
		||||
					File:     tmpfile.Name(),
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	app := NewApp(ctx, config)
 | 
			
		||||
	healthRegistry := health.NewRegistry()
 | 
			
		||||
	app.RegisterHealthChecks(healthRegistry)
 | 
			
		||||
 | 
			
		||||
	// Wait for health check to happen
 | 
			
		||||
	<-time.After(2 * interval)
 | 
			
		||||
 | 
			
		||||
	status := healthRegistry.CheckStatus()
 | 
			
		||||
	if len(status) != 1 {
 | 
			
		||||
		t.Fatal("expected 1 item in health check results")
 | 
			
		||||
	}
 | 
			
		||||
	if status[tmpfile.Name()] != "file exists" {
 | 
			
		||||
		t.Fatal(`did not get "file exists" result for health check`)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	os.Remove(tmpfile.Name())
 | 
			
		||||
 | 
			
		||||
	<-time.After(2 * interval)
 | 
			
		||||
	if len(healthRegistry.CheckStatus()) != 0 {
 | 
			
		||||
		t.Fatal("expected 0 items in health check results")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestTCPHealthCheck(t *testing.T) {
 | 
			
		||||
	interval := time.Second
 | 
			
		||||
 | 
			
		||||
	ln, err := net.Listen("tcp", "127.0.0.1:0")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("could not create listener: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	addrStr := ln.Addr().String()
 | 
			
		||||
 | 
			
		||||
	// Start accepting
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			conn, err := ln.Accept()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				// listener was closed
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			defer conn.Close()
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	config := configuration.Configuration{
 | 
			
		||||
		Storage: configuration.Storage{
 | 
			
		||||
			"inmemory": configuration.Parameters{},
 | 
			
		||||
		},
 | 
			
		||||
		Health: configuration.Health{
 | 
			
		||||
			TCPCheckers: []configuration.TCPChecker{
 | 
			
		||||
				{
 | 
			
		||||
					Interval: interval,
 | 
			
		||||
					Addr:     addrStr,
 | 
			
		||||
					Timeout:  500 * time.Millisecond,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	app := NewApp(ctx, config)
 | 
			
		||||
	healthRegistry := health.NewRegistry()
 | 
			
		||||
	app.RegisterHealthChecks(healthRegistry)
 | 
			
		||||
 | 
			
		||||
	// Wait for health check to happen
 | 
			
		||||
	<-time.After(2 * interval)
 | 
			
		||||
 | 
			
		||||
	if len(healthRegistry.CheckStatus()) != 0 {
 | 
			
		||||
		t.Fatal("expected 0 items in health check results")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ln.Close()
 | 
			
		||||
	<-time.After(2 * interval)
 | 
			
		||||
 | 
			
		||||
	// Health check should now fail
 | 
			
		||||
	status := healthRegistry.CheckStatus()
 | 
			
		||||
	if len(status) != 1 {
 | 
			
		||||
		t.Fatal("expected 1 item in health check results")
 | 
			
		||||
	}
 | 
			
		||||
	if status[addrStr] != "connection to "+addrStr+" failed" {
 | 
			
		||||
		t.Fatal(`did not get "connection failed" result for health check`)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestHTTPHealthCheck(t *testing.T) {
 | 
			
		||||
	interval := time.Second
 | 
			
		||||
	threshold := 3
 | 
			
		||||
 | 
			
		||||
	stopFailing := make(chan struct{})
 | 
			
		||||
 | 
			
		||||
	checkedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		if r.Method != "HEAD" {
 | 
			
		||||
			t.Fatalf("expected HEAD request, got %s", r.Method)
 | 
			
		||||
		}
 | 
			
		||||
		select {
 | 
			
		||||
		case <-stopFailing:
 | 
			
		||||
			w.WriteHeader(http.StatusOK)
 | 
			
		||||
		default:
 | 
			
		||||
			w.WriteHeader(http.StatusInternalServerError)
 | 
			
		||||
		}
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
	config := configuration.Configuration{
 | 
			
		||||
		Storage: configuration.Storage{
 | 
			
		||||
			"inmemory": configuration.Parameters{},
 | 
			
		||||
		},
 | 
			
		||||
		Health: configuration.Health{
 | 
			
		||||
			HTTPCheckers: []configuration.HTTPChecker{
 | 
			
		||||
				{
 | 
			
		||||
					Interval:  interval,
 | 
			
		||||
					URI:       checkedServer.URL,
 | 
			
		||||
					Threshold: threshold,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	app := NewApp(ctx, config)
 | 
			
		||||
	healthRegistry := health.NewRegistry()
 | 
			
		||||
	app.RegisterHealthChecks(healthRegistry)
 | 
			
		||||
 | 
			
		||||
	for i := 0; ; i++ {
 | 
			
		||||
		<-time.After(interval)
 | 
			
		||||
 | 
			
		||||
		status := healthRegistry.CheckStatus()
 | 
			
		||||
 | 
			
		||||
		if i < threshold-1 {
 | 
			
		||||
			// definitely shouldn't have hit the threshold yet
 | 
			
		||||
			if len(status) != 0 {
 | 
			
		||||
				t.Fatal("expected 1 item in health check results")
 | 
			
		||||
			}
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if i < threshold+1 {
 | 
			
		||||
			// right on the threshold - don't expect a failure yet
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if len(status) != 1 {
 | 
			
		||||
			t.Fatal("expected 1 item in health check results")
 | 
			
		||||
		}
 | 
			
		||||
		if status[checkedServer.URL] != "downstream service returned unexpected status: 500" {
 | 
			
		||||
			t.Fatal("did not get expected result for health check")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		break
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Signal HTTP handler to start returning 200
 | 
			
		||||
	close(stopFailing)
 | 
			
		||||
 | 
			
		||||
	<-time.After(2 * interval)
 | 
			
		||||
 | 
			
		||||
	if len(healthRegistry.CheckStatus()) != 0 {
 | 
			
		||||
		t.Fatal("expected 0 items in health check results")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue