commit
						581be91482
					
				|  | @ -1,66 +0,0 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"expvar" | ||||
| 	"sync/atomic" | ||||
| 
 | ||||
| 	dcontext "github.com/docker/distribution/context" | ||||
| 	"github.com/docker/distribution/registry/storage/cache" | ||||
| ) | ||||
| 
 | ||||
| type blobStatCollector struct { | ||||
| 	metrics cache.Metrics | ||||
| } | ||||
| 
 | ||||
| func (bsc *blobStatCollector) Hit() { | ||||
| 	atomic.AddUint64(&bsc.metrics.Requests, 1) | ||||
| 	atomic.AddUint64(&bsc.metrics.Hits, 1) | ||||
| } | ||||
| 
 | ||||
| func (bsc *blobStatCollector) Miss() { | ||||
| 	atomic.AddUint64(&bsc.metrics.Requests, 1) | ||||
| 	atomic.AddUint64(&bsc.metrics.Misses, 1) | ||||
| } | ||||
| 
 | ||||
| func (bsc *blobStatCollector) Metrics() cache.Metrics { | ||||
| 	return bsc.metrics | ||||
| } | ||||
| 
 | ||||
| func (bsc *blobStatCollector) Logger(ctx context.Context) cache.Logger { | ||||
| 	return dcontext.GetLogger(ctx) | ||||
| } | ||||
| 
 | ||||
| // blobStatterCacheMetrics keeps track of cache metrics for blob descriptor
 | ||||
| // cache requests. Note this is kept globally and made available via expvar.
 | ||||
| // For more detailed metrics, its recommend to instrument a particular cache
 | ||||
| // implementation.
 | ||||
| var blobStatterCacheMetrics cache.MetricsTracker = &blobStatCollector{} | ||||
| 
 | ||||
| func init() { | ||||
| 	registry := expvar.Get("registry") | ||||
| 	if registry == nil { | ||||
| 		registry = expvar.NewMap("registry") | ||||
| 	} | ||||
| 
 | ||||
| 	cache := registry.(*expvar.Map).Get("cache") | ||||
| 	if cache == nil { | ||||
| 		cache = &expvar.Map{} | ||||
| 		cache.(*expvar.Map).Init() | ||||
| 		registry.(*expvar.Map).Set("cache", cache) | ||||
| 	} | ||||
| 
 | ||||
| 	storage := cache.(*expvar.Map).Get("storage") | ||||
| 	if storage == nil { | ||||
| 		storage = &expvar.Map{} | ||||
| 		storage.(*expvar.Map).Init() | ||||
| 		cache.(*expvar.Map).Set("storage", storage) | ||||
| 	} | ||||
| 
 | ||||
| 	storage.(*expvar.Map).Set("blobdescriptor", expvar.Func(func() interface{} { | ||||
| 		// no need for synchronous access: the increments are atomic and
 | ||||
| 		// during reading, we don't care if the data is up to date. The
 | ||||
| 		// numbers will always *eventually* be reported correctly.
 | ||||
| 		return blobStatterCacheMetrics | ||||
| 	})) | ||||
| } | ||||
|  | @ -0,0 +1,131 @@ | |||
| package cache | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	digest "github.com/opencontainers/go-digest" | ||||
| ) | ||||
| 
 | ||||
| func TestCacheSet(t *testing.T) { | ||||
| 	cache := newTestStatter() | ||||
| 	backend := newTestStatter() | ||||
| 	st := NewCachedBlobStatter(cache, backend) | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	dgst := digest.Digest("dontvalidate") | ||||
| 	_, err := st.Stat(ctx, dgst) | ||||
| 	if err != distribution.ErrBlobUnknown { | ||||
| 		t.Fatalf("Unexpected error %v, expected %v", err, distribution.ErrBlobUnknown) | ||||
| 	} | ||||
| 
 | ||||
| 	desc := distribution.Descriptor{ | ||||
| 		Digest: dgst, | ||||
| 	} | ||||
| 	if err := backend.SetDescriptor(ctx, dgst, desc); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	actual, err := st.Stat(ctx, dgst) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if actual.Digest != desc.Digest { | ||||
| 		t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(cache.sets) != 1 || len(cache.sets[dgst]) == 0 { | ||||
| 		t.Fatalf("Expected cache set") | ||||
| 	} | ||||
| 	if cache.sets[dgst][0].Digest != desc.Digest { | ||||
| 		t.Fatalf("Unexpected descriptor %v, expected %v", cache.sets[dgst][0], desc) | ||||
| 	} | ||||
| 
 | ||||
| 	desc2 := distribution.Descriptor{ | ||||
| 		Digest: digest.Digest("dontvalidate 2"), | ||||
| 	} | ||||
| 	cache.sets[dgst] = append(cache.sets[dgst], desc2) | ||||
| 
 | ||||
| 	actual, err = st.Stat(ctx, dgst) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if actual.Digest != desc2.Digest { | ||||
| 		t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestCacheError(t *testing.T) { | ||||
| 	cache := newErrTestStatter(errors.New("cache error")) | ||||
| 	backend := newTestStatter() | ||||
| 	st := NewCachedBlobStatter(cache, backend) | ||||
| 	ctx := context.Background() | ||||
| 
 | ||||
| 	dgst := digest.Digest("dontvalidate") | ||||
| 	_, err := st.Stat(ctx, dgst) | ||||
| 	if err != distribution.ErrBlobUnknown { | ||||
| 		t.Fatalf("Unexpected error %v, expected %v", err, distribution.ErrBlobUnknown) | ||||
| 	} | ||||
| 
 | ||||
| 	desc := distribution.Descriptor{ | ||||
| 		Digest: dgst, | ||||
| 	} | ||||
| 	if err := backend.SetDescriptor(ctx, dgst, desc); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	actual, err := st.Stat(ctx, dgst) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if actual.Digest != desc.Digest { | ||||
| 		t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(cache.sets) > 0 { | ||||
| 		t.Fatalf("Set should not be called after stat error") | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func newTestStatter() *testStatter { | ||||
| 	return &testStatter{ | ||||
| 		stats: []digest.Digest{}, | ||||
| 		sets:  map[digest.Digest][]distribution.Descriptor{}, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func newErrTestStatter(err error) *testStatter { | ||||
| 	return &testStatter{ | ||||
| 		sets: map[digest.Digest][]distribution.Descriptor{}, | ||||
| 		err:  err, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type testStatter struct { | ||||
| 	stats []digest.Digest | ||||
| 	sets  map[digest.Digest][]distribution.Descriptor | ||||
| 	err   error | ||||
| } | ||||
| 
 | ||||
| func (s *testStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { | ||||
| 	if s.err != nil { | ||||
| 		return distribution.Descriptor{}, s.err | ||||
| 	} | ||||
| 
 | ||||
| 	if set := s.sets[dgst]; len(set) > 0 { | ||||
| 		return set[len(set)-1], nil | ||||
| 	} | ||||
| 
 | ||||
| 	return distribution.Descriptor{}, distribution.ErrBlobUnknown | ||||
| } | ||||
| 
 | ||||
| func (s *testStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { | ||||
| 	s.sets[dgst] = append(s.sets[dgst], desc) | ||||
| 	return s.err | ||||
| } | ||||
| 
 | ||||
| func (s *testStatter) Clear(ctx context.Context, dgst digest.Digest) error { | ||||
| 	return s.err | ||||
| } | ||||
|  | @ -54,6 +54,10 @@ func checkBlobDescriptorCacheEmptyRepository(ctx context.Context, t *testing.T, | |||
| 		t.Fatalf("expected error checking for cache item with empty digest: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if _, err := cache.Stat(ctx, "sha384:cba111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"); err != distribution.ErrBlobUnknown { | ||||
| 		t.Fatalf("expected unknown blob error with uncached repo: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if _, err := cache.Stat(ctx, "sha384:abc111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"); err != distribution.ErrBlobUnknown { | ||||
| 		t.Fatalf("expected unknown blob error with empty repo: %v", err) | ||||
| 	} | ||||
|  |  | |||
|  | @ -4,39 +4,14 @@ import ( | |||
| 	"context" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	dcontext "github.com/docker/distribution/context" | ||||
| 	prometheus "github.com/docker/distribution/metrics" | ||||
| 	"github.com/opencontainers/go-digest" | ||||
| ) | ||||
| 
 | ||||
| // Metrics is used to hold metric counters
 | ||||
| // related to the number of times a cache was
 | ||||
| // hit or missed.
 | ||||
| type Metrics struct { | ||||
| 	Requests uint64 | ||||
| 	Hits     uint64 | ||||
| 	Misses   uint64 | ||||
| } | ||||
| 
 | ||||
| // Logger can be provided on the MetricsTracker to log errors.
 | ||||
| //
 | ||||
| // Usually, this is just a proxy to dcontext.GetLogger.
 | ||||
| type Logger interface { | ||||
| 	Errorf(format string, args ...interface{}) | ||||
| } | ||||
| 
 | ||||
| // MetricsTracker represents a metric tracker
 | ||||
| // which simply counts the number of hits and misses.
 | ||||
| type MetricsTracker interface { | ||||
| 	Hit() | ||||
| 	Miss() | ||||
| 	Metrics() Metrics | ||||
| 	Logger(context.Context) Logger | ||||
| } | ||||
| 
 | ||||
| type cachedBlobStatter struct { | ||||
| 	cache   distribution.BlobDescriptorService | ||||
| 	backend distribution.BlobDescriptorService | ||||
| 	tracker MetricsTracker | ||||
| } | ||||
| 
 | ||||
| var ( | ||||
|  | @ -53,47 +28,36 @@ func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend dist | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and
 | ||||
| // falls back to a backend. Hits and misses will send to the tracker.
 | ||||
| func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService, tracker MetricsTracker) distribution.BlobStatter { | ||||
| 	return &cachedBlobStatter{ | ||||
| 		cache:   cache, | ||||
| 		backend: backend, | ||||
| 		tracker: tracker, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { | ||||
| 	cacheCount.WithValues("Request").Inc(1) | ||||
| 	desc, err := cbds.cache.Stat(ctx, dgst) | ||||
| 	if err != nil { | ||||
| 		if err != distribution.ErrBlobUnknown { | ||||
| 			logErrorf(ctx, cbds.tracker, "error retrieving descriptor from cache: %v", err) | ||||
| 		} | ||||
| 
 | ||||
| 		goto fallback | ||||
| 	// try getting from cache
 | ||||
| 	desc, cacheErr := cbds.cache.Stat(ctx, dgst) | ||||
| 	if cacheErr == nil { | ||||
| 		cacheCount.WithValues("Hit").Inc(1) | ||||
| 		return desc, nil | ||||
| 	} | ||||
| 	cacheCount.WithValues("Hit").Inc(1) | ||||
| 	if cbds.tracker != nil { | ||||
| 		cbds.tracker.Hit() | ||||
| 	} | ||||
| 	return desc, nil | ||||
| fallback: | ||||
| 	cacheCount.WithValues("Miss").Inc(1) | ||||
| 	if cbds.tracker != nil { | ||||
| 		cbds.tracker.Miss() | ||||
| 	} | ||||
| 	desc, err = cbds.backend.Stat(ctx, dgst) | ||||
| 
 | ||||
| 	// couldn't get from cache; get from backend
 | ||||
| 	desc, err := cbds.backend.Stat(ctx, dgst) | ||||
| 	if err != nil { | ||||
| 		return desc, err | ||||
| 	} | ||||
| 
 | ||||
| 	if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { | ||||
| 		logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err) | ||||
| 	if cacheErr == distribution.ErrBlobUnknown { | ||||
| 		// cache doesn't have info. update it with info got from backend
 | ||||
| 		cacheCount.WithValues("Miss").Inc(1) | ||||
| 		if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { | ||||
| 			dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(err).Error("error from cache setting desc") | ||||
| 		} | ||||
| 		// we don't need to return cache error upstream if any. continue returning value from backend
 | ||||
| 	} else { | ||||
| 		// unknown error from cache. just log and error. do not store cache as it may be trigger many set calls
 | ||||
| 		dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(cacheErr).Error("error from cache stat(ing) blob") | ||||
| 		cacheCount.WithValues("Error").Inc(1) | ||||
| 	} | ||||
| 
 | ||||
| 	return desc, err | ||||
| 
 | ||||
| 	return desc, nil | ||||
| } | ||||
| 
 | ||||
| func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error { | ||||
|  | @ -111,19 +75,7 @@ func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) er | |||
| 
 | ||||
| func (cbds *cachedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { | ||||
| 	if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { | ||||
| 		logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err) | ||||
| 		dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(err).Error("error from cache setting desc") | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func logErrorf(ctx context.Context, tracker MetricsTracker, format string, args ...interface{}) { | ||||
| 	if tracker == nil { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	logger := tracker.Logger(ctx) | ||||
| 	if logger == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	logger.Errorf(format, args...) | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,69 @@ | |||
| package metrics | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	prometheus "github.com/docker/distribution/metrics" | ||||
| 	"github.com/docker/distribution/registry/storage/cache" | ||||
| 	"github.com/docker/go-metrics" | ||||
| 	"github.com/opencontainers/go-digest" | ||||
| ) | ||||
| 
 | ||||
| type prometheusCacheProvider struct { | ||||
| 	cache.BlobDescriptorCacheProvider | ||||
| 	latencyTimer metrics.LabeledTimer | ||||
| } | ||||
| 
 | ||||
| func NewPrometheusCacheProvider(wrap cache.BlobDescriptorCacheProvider, name, help string) cache.BlobDescriptorCacheProvider { | ||||
| 	return &prometheusCacheProvider{ | ||||
| 		wrap, | ||||
| 		// TODO: May want to have fine grained buckets since redis calls are generally <1ms and the default minimum bucket is 5ms.
 | ||||
| 		prometheus.StorageNamespace.NewLabeledTimer(name, help, "operation"), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (p *prometheusCacheProvider) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { | ||||
| 	start := time.Now() | ||||
| 	d, e := p.BlobDescriptorCacheProvider.Stat(ctx, dgst) | ||||
| 	p.latencyTimer.WithValues("Stat").UpdateSince(start) | ||||
| 	return d, e | ||||
| } | ||||
| 
 | ||||
| func (p *prometheusCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { | ||||
| 	start := time.Now() | ||||
| 	e := p.BlobDescriptorCacheProvider.SetDescriptor(ctx, dgst, desc) | ||||
| 	p.latencyTimer.WithValues("SetDescriptor").UpdateSince(start) | ||||
| 	return e | ||||
| } | ||||
| 
 | ||||
| type prometheusRepoCacheProvider struct { | ||||
| 	distribution.BlobDescriptorService | ||||
| 	latencyTimer metrics.LabeledTimer | ||||
| } | ||||
| 
 | ||||
| func (p *prometheusRepoCacheProvider) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { | ||||
| 	start := time.Now() | ||||
| 	d, e := p.BlobDescriptorService.Stat(ctx, dgst) | ||||
| 	p.latencyTimer.WithValues("RepoStat").UpdateSince(start) | ||||
| 	return d, e | ||||
| } | ||||
| 
 | ||||
| func (p *prometheusRepoCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { | ||||
| 	start := time.Now() | ||||
| 	e := p.BlobDescriptorService.SetDescriptor(ctx, dgst, desc) | ||||
| 	p.latencyTimer.WithValues("RepoSetDescriptor").UpdateSince(start) | ||||
| 	return e | ||||
| } | ||||
| 
 | ||||
| func (p *prometheusCacheProvider) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) { | ||||
| 	s, err := p.BlobDescriptorCacheProvider.RepositoryScoped(repo) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &prometheusRepoCacheProvider{ | ||||
| 		s, | ||||
| 		p.latencyTimer, | ||||
| 	}, nil | ||||
| } | ||||
|  | @ -7,6 +7,7 @@ import ( | |||
| 	"github.com/docker/distribution" | ||||
| 	"github.com/docker/distribution/reference" | ||||
| 	"github.com/docker/distribution/registry/storage/cache" | ||||
| 	"github.com/docker/distribution/registry/storage/cache/metrics" | ||||
| 	"github.com/garyburd/redigo/redis" | ||||
| 	"github.com/opencontainers/go-digest" | ||||
| ) | ||||
|  | @ -34,9 +35,13 @@ type redisBlobDescriptorService struct { | |||
| // NewRedisBlobDescriptorCacheProvider returns a new redis-based
 | ||||
| // BlobDescriptorCacheProvider using the provided redis connection pool.
 | ||||
| func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider { | ||||
| 	return &redisBlobDescriptorService{ | ||||
| 		pool: pool, | ||||
| 	} | ||||
| 	return metrics.NewPrometheusCacheProvider( | ||||
| 		&redisBlobDescriptorService{ | ||||
| 			pool: pool, | ||||
| 		}, | ||||
| 		"cache_redis", | ||||
| 		"Number of seconds taken by redis", | ||||
| 	) | ||||
| } | ||||
| 
 | ||||
| // RepositoryScoped returns the scoped cache.
 | ||||
|  | @ -181,6 +186,10 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte | |||
| 	// We allow a per repository mediatype, let's look it up here.
 | ||||
| 	mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype")) | ||||
| 	if err != nil { | ||||
| 		if err == redis.ErrNil { | ||||
| 			return distribution.Descriptor{}, distribution.ErrBlobUnknown | ||||
| 		} | ||||
| 
 | ||||
| 		return distribution.Descriptor{}, err | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue