Merge pull request #312 from stevvooe/add-layer-info-cache
registry: integrate layer info cache with registry and storagemaster
						commit
						e56124d343
					
				| 
						 | 
				
			
			@ -1,3 +1,3 @@
 | 
			
		|||
// Package registry is a placeholder package for registry interface
 | 
			
		||||
// destinations and utilities.
 | 
			
		||||
// definitions and utilities.
 | 
			
		||||
package registry
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,10 +1,12 @@
 | 
			
		|||
package handlers
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"expvar"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"code.google.com/p/go-uuid/uuid"
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
| 
						 | 
				
			
			@ -16,9 +18,11 @@ import (
 | 
			
		|||
	registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
 | 
			
		||||
	repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/cache"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/factory"
 | 
			
		||||
	storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
 | 
			
		||||
	"github.com/garyburd/redigo/redis"
 | 
			
		||||
	"github.com/gorilla/mux"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -44,6 +48,8 @@ type App struct {
 | 
			
		|||
		sink   notifications.Sink
 | 
			
		||||
		source notifications.SourceRecord
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	redis *redis.Pool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Value intercepts calls context.Context.Value, returning the current app id,
 | 
			
		||||
| 
						 | 
				
			
			@ -95,8 +101,32 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	app.configureEvents(&configuration)
 | 
			
		||||
	app.configureRedis(&configuration)
 | 
			
		||||
 | 
			
		||||
	// configure storage caches
 | 
			
		||||
	if cc, ok := configuration.Storage["cache"]; ok {
 | 
			
		||||
		switch cc["layerinfo"] {
 | 
			
		||||
		case "redis":
 | 
			
		||||
			if app.redis == nil {
 | 
			
		||||
				panic("redis configuration required to use for layerinfo cache")
 | 
			
		||||
			}
 | 
			
		||||
			app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis))
 | 
			
		||||
			ctxu.GetLogger(app).Infof("using redis layerinfo cache")
 | 
			
		||||
		case "inmemory":
 | 
			
		||||
			app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache())
 | 
			
		||||
			ctxu.GetLogger(app).Infof("using inmemory layerinfo cache")
 | 
			
		||||
		default:
 | 
			
		||||
			if cc["layerinfo"] != "" {
 | 
			
		||||
				ctxu.GetLogger(app).Warnf("unkown cache type %q, caching disabled", configuration.Storage["cache"])
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if app.registry == nil {
 | 
			
		||||
		// configure the registry if no cache section is available.
 | 
			
		||||
		app.registry = storage.NewRegistryWithDriver(app.driver, nil)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	app.registry = storage.NewRegistryWithDriver(app.driver)
 | 
			
		||||
	app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
| 
						 | 
				
			
			@ -174,6 +204,88 @@ func (app *App) configureEvents(configuration *configuration.Configuration) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (app *App) configureRedis(configuration *configuration.Configuration) {
 | 
			
		||||
	if configuration.Redis.Addr == "" {
 | 
			
		||||
		ctxu.GetLogger(app).Infof("redis not configured")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pool := &redis.Pool{
 | 
			
		||||
		Dial: func() (redis.Conn, error) {
 | 
			
		||||
			// TODO(stevvooe): Yet another use case for contextual timing.
 | 
			
		||||
			ctx := context.WithValue(app, "redis.connect.startedat", time.Now())
 | 
			
		||||
 | 
			
		||||
			done := func(err error) {
 | 
			
		||||
				logger := ctxu.GetLoggerWithField(ctx, "redis.connect.duration",
 | 
			
		||||
					ctxu.Since(ctx, "redis.connect.startedat"))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					logger.Errorf("redis: error connecting: %v", err)
 | 
			
		||||
				} else {
 | 
			
		||||
					logger.Infof("redis: connect %v", configuration.Redis.Addr)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			conn, err := redis.DialTimeout("tcp",
 | 
			
		||||
				configuration.Redis.Addr,
 | 
			
		||||
				configuration.Redis.DialTimeout,
 | 
			
		||||
				configuration.Redis.ReadTimeout,
 | 
			
		||||
				configuration.Redis.WriteTimeout)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				ctxu.GetLogger(app).Errorf("error connecting to redis instance %s: %v",
 | 
			
		||||
					configuration.Redis.Addr, err)
 | 
			
		||||
				done(err)
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// authorize the connection
 | 
			
		||||
			if configuration.Redis.Password != "" {
 | 
			
		||||
				if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil {
 | 
			
		||||
					defer conn.Close()
 | 
			
		||||
					done(err)
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// select the database to use
 | 
			
		||||
			if configuration.Redis.DB != 0 {
 | 
			
		||||
				if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil {
 | 
			
		||||
					defer conn.Close()
 | 
			
		||||
					done(err)
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			done(nil)
 | 
			
		||||
			return conn, nil
 | 
			
		||||
		},
 | 
			
		||||
		MaxIdle:     configuration.Redis.Pool.MaxIdle,
 | 
			
		||||
		MaxActive:   configuration.Redis.Pool.MaxActive,
 | 
			
		||||
		IdleTimeout: configuration.Redis.Pool.IdleTimeout,
 | 
			
		||||
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
 | 
			
		||||
			// TODO(stevvooe): We can probably do something more interesting
 | 
			
		||||
			// here with the health package.
 | 
			
		||||
			_, err := c.Do("PING")
 | 
			
		||||
			return err
 | 
			
		||||
		},
 | 
			
		||||
		Wait: false, // if a connection is not avialable, proceed without cache.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	app.redis = pool
 | 
			
		||||
 | 
			
		||||
	// setup expvar
 | 
			
		||||
	registry := expvar.Get("registry")
 | 
			
		||||
	if registry == nil {
 | 
			
		||||
		registry = expvar.NewMap("registry")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} {
 | 
			
		||||
		return map[string]interface{}{
 | 
			
		||||
			"Config": configuration.Redis,
 | 
			
		||||
			"Active": app.redis.ActiveCount(),
 | 
			
		||||
		}
 | 
			
		||||
	}))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	defer r.Body.Close() // ensure that request body is always closed.
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,6 +13,7 @@ import (
 | 
			
		|||
	"github.com/docker/distribution/registry/auth"
 | 
			
		||||
	_ "github.com/docker/distribution/registry/auth/silly"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/cache"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -28,7 +29,7 @@ func TestAppDispatcher(t *testing.T) {
 | 
			
		|||
		Context:  context.Background(),
 | 
			
		||||
		router:   v2.Router(),
 | 
			
		||||
		driver:   driver,
 | 
			
		||||
		registry: storage.NewRegistryWithDriver(driver),
 | 
			
		||||
		registry: storage.NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()),
 | 
			
		||||
	}
 | 
			
		||||
	server := httptest.NewServer(app)
 | 
			
		||||
	router := v2.Router()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,8 +18,9 @@ import (
 | 
			
		|||
// abstraction, providing utility methods that support creating and traversing
 | 
			
		||||
// backend links.
 | 
			
		||||
type blobStore struct {
 | 
			
		||||
	*registry
 | 
			
		||||
	ctx context.Context
 | 
			
		||||
	driver storagedriver.StorageDriver
 | 
			
		||||
	pm     *pathMapper
 | 
			
		||||
	ctx    context.Context
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// exists reports whether or not the path exists. If the driver returns error
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,98 @@
 | 
			
		|||
// Package cache provides facilities to speed up access to the storage
 | 
			
		||||
// backend. Typically cache implementations deal with internal implementation
 | 
			
		||||
// details at the backend level, rather than generalized caches for
 | 
			
		||||
// distribution related interfaces. In other words, unless the cache is
 | 
			
		||||
// specific to the storage package, it belongs in another package.
 | 
			
		||||
package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ErrNotFound is returned when a meta item is not found.
 | 
			
		||||
var ErrNotFound = fmt.Errorf("not found")
 | 
			
		||||
 | 
			
		||||
// LayerMeta describes the backend location and length of layer data.
 | 
			
		||||
type LayerMeta struct {
 | 
			
		||||
	Path   string
 | 
			
		||||
	Length int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LayerInfoCache is a driver-aware cache of layer metadata. Basically, it
 | 
			
		||||
// provides a fast cache for checks against repository metadata, avoiding
 | 
			
		||||
// round trips to backend storage. Note that this is different from a pure
 | 
			
		||||
// layer cache, which would also provide access to backing data, as well. Such
 | 
			
		||||
// a cache should be implemented as a middleware, rather than integrated with
 | 
			
		||||
// the storage backend.
 | 
			
		||||
//
 | 
			
		||||
// Note that most implementations rely on the caller to do strict checks on on
 | 
			
		||||
// repo and dgst arguments, since these are mostly used behind existing
 | 
			
		||||
// implementations.
 | 
			
		||||
type LayerInfoCache interface {
 | 
			
		||||
	// Contains returns true if the repository with name contains the layer.
 | 
			
		||||
	Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error)
 | 
			
		||||
 | 
			
		||||
	// Add includes the layer in the given repository cache.
 | 
			
		||||
	Add(ctx context.Context, repo string, dgst digest.Digest) error
 | 
			
		||||
 | 
			
		||||
	// Meta provides the location of the layer on the backend and its size. Membership of a
 | 
			
		||||
	// repository should be tested before using the result, if required.
 | 
			
		||||
	Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error)
 | 
			
		||||
 | 
			
		||||
	// SetMeta sets the meta data for the given layer.
 | 
			
		||||
	SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// base implements common checks between cache implementations. Note that
 | 
			
		||||
// these are not full checks of input, since that should be done by the
 | 
			
		||||
// caller.
 | 
			
		||||
type base struct {
 | 
			
		||||
	LayerInfoCache
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *base) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) {
 | 
			
		||||
	if repo == "" {
 | 
			
		||||
		return false, fmt.Errorf("cache: cannot check for empty repository name")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if dgst == "" {
 | 
			
		||||
		return false, fmt.Errorf("cache: cannot check for empty digests")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return b.LayerInfoCache.Contains(ctx, repo, dgst)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *base) Add(ctx context.Context, repo string, dgst digest.Digest) error {
 | 
			
		||||
	if repo == "" {
 | 
			
		||||
		return fmt.Errorf("cache: cannot add empty repository name")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if dgst == "" {
 | 
			
		||||
		return fmt.Errorf("cache: cannot add empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return b.LayerInfoCache.Add(ctx, repo, dgst)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *base) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) {
 | 
			
		||||
	if dgst == "" {
 | 
			
		||||
		return LayerMeta{}, fmt.Errorf("cache: cannot get meta for empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return b.LayerInfoCache.Meta(ctx, dgst)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *base) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error {
 | 
			
		||||
	if dgst == "" {
 | 
			
		||||
		return fmt.Errorf("cache: cannot set meta for empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if meta.Path == "" {
 | 
			
		||||
		return fmt.Errorf("cache: cannot set empty path for meta")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return b.LayerInfoCache.SetMeta(ctx, dgst, meta)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,86 @@
 | 
			
		|||
package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// checkLayerInfoCache takes a cache implementation through a common set of
 | 
			
		||||
// operations. If adding new tests, please add them here so new
 | 
			
		||||
// implementations get the benefit.
 | 
			
		||||
func checkLayerInfoCache(t *testing.T, lic LayerInfoCache) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	exists, err := lic.Contains(ctx, "", "fake:abc")
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Fatalf("expected error checking for cache item with empty repo")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	exists, err = lic.Contains(ctx, "foo/bar", "")
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Fatalf("expected error checking for cache item with empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	exists, err = lic.Contains(ctx, "foo/bar", "fake:abc")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error checking for cache item: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if exists {
 | 
			
		||||
		t.Fatalf("item should not exist")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := lic.Add(ctx, "", "fake:abc"); err == nil {
 | 
			
		||||
		t.Fatalf("expected error adding cache item with empty name")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := lic.Add(ctx, "foo/bar", ""); err == nil {
 | 
			
		||||
		t.Fatalf("expected error adding cache item with empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := lic.Add(ctx, "foo/bar", "fake:abc"); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error adding item: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	exists, err = lic.Contains(ctx, "foo/bar", "fake:abc")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error checking for cache item: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !exists {
 | 
			
		||||
		t.Fatalf("item should exist")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = lic.Meta(ctx, "")
 | 
			
		||||
	if err == nil || err == ErrNotFound {
 | 
			
		||||
		t.Fatalf("expected error getting meta for cache item with empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = lic.Meta(ctx, "fake:abc")
 | 
			
		||||
	if err != ErrNotFound {
 | 
			
		||||
		t.Fatalf("expected unknown layer error getting meta for cache item with empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err = lic.SetMeta(ctx, "", LayerMeta{}); err == nil {
 | 
			
		||||
		t.Fatalf("expected error setting meta for cache item with empty digest")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err = lic.SetMeta(ctx, "foo/bar", LayerMeta{}); err == nil {
 | 
			
		||||
		t.Fatalf("expected error setting meta for cache item with empty meta")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expected := LayerMeta{Path: "/foo/bar", Length: 20}
 | 
			
		||||
	if err := lic.SetMeta(ctx, "foo/bar", expected); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error setting meta: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	meta, err := lic.Meta(ctx, "foo/bar")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error getting meta: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if meta != expected {
 | 
			
		||||
		t.Fatalf("retrieved meta data did not match: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,63 @@
 | 
			
		|||
package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// inmemoryLayerInfoCache is a map-based implementation of LayerInfoCache.
 | 
			
		||||
type inmemoryLayerInfoCache struct {
 | 
			
		||||
	membership map[string]map[digest.Digest]struct{}
 | 
			
		||||
	meta       map[digest.Digest]LayerMeta
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewInMemoryLayerInfoCache provides an implementation of LayerInfoCache that
 | 
			
		||||
// stores results in memory.
 | 
			
		||||
func NewInMemoryLayerInfoCache() LayerInfoCache {
 | 
			
		||||
	return &base{&inmemoryLayerInfoCache{
 | 
			
		||||
		membership: make(map[string]map[digest.Digest]struct{}),
 | 
			
		||||
		meta:       make(map[digest.Digest]LayerMeta),
 | 
			
		||||
	}}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ilic *inmemoryLayerInfoCache) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) {
 | 
			
		||||
	members, ok := ilic.membership[repo]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, ok = members[dgst]
 | 
			
		||||
	return ok, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add adds the layer to the redis repository blob set.
 | 
			
		||||
func (ilic *inmemoryLayerInfoCache) Add(ctx context.Context, repo string, dgst digest.Digest) error {
 | 
			
		||||
	members, ok := ilic.membership[repo]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		members = make(map[digest.Digest]struct{})
 | 
			
		||||
		ilic.membership[repo] = members
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	members[dgst] = struct{}{}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Meta retrieves the layer meta data from the redis hash, returning
 | 
			
		||||
// ErrUnknownLayer if not found.
 | 
			
		||||
func (ilic *inmemoryLayerInfoCache) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) {
 | 
			
		||||
	meta, ok := ilic.meta[dgst]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return LayerMeta{}, ErrNotFound
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return meta, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetMeta sets the meta data for the given digest using a redis hash. A hash
 | 
			
		||||
// is used here since we may store unrelated fields about a layer in the
 | 
			
		||||
// future.
 | 
			
		||||
func (ilic *inmemoryLayerInfoCache) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error {
 | 
			
		||||
	ilic.meta[dgst] = meta
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,9 @@
 | 
			
		|||
package cache
 | 
			
		||||
 | 
			
		||||
import "testing"
 | 
			
		||||
 | 
			
		||||
// TestInMemoryLayerInfoCache checks the in memory implementation is working
 | 
			
		||||
// correctly.
 | 
			
		||||
func TestInMemoryLayerInfoCache(t *testing.T) {
 | 
			
		||||
	checkLayerInfoCache(t, NewInMemoryLayerInfoCache())
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,98 @@
 | 
			
		|||
package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	ctxu "github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/garyburd/redigo/redis"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// redisLayerInfoCache provides an implementation of storage.LayerInfoCache
 | 
			
		||||
// based on redis. Layer info is stored in two parts. The first provide fast
 | 
			
		||||
// access to repository membership through a redis set for each repo. The
 | 
			
		||||
// second is a redis hash keyed by the digest of the layer, providing path and
 | 
			
		||||
// length information. Note that there is no implied relationship between
 | 
			
		||||
// these two caches. The layer may exist in one, both or none and the code
 | 
			
		||||
// must be written this way.
 | 
			
		||||
type redisLayerInfoCache struct {
 | 
			
		||||
	pool *redis.Pool
 | 
			
		||||
 | 
			
		||||
	// TODO(stevvooe): We use a pool because we don't have great control over
 | 
			
		||||
	// the cache lifecycle to manage connections. A new connection if fetched
 | 
			
		||||
	// for each operation. Once we have better lifecycle management of the
 | 
			
		||||
	// request objects, we can change this to a connection.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRedisLayerInfoCache returns a new redis-based LayerInfoCache using the
 | 
			
		||||
// provided redis connection pool.
 | 
			
		||||
func NewRedisLayerInfoCache(pool *redis.Pool) LayerInfoCache {
 | 
			
		||||
	return &base{&redisLayerInfoCache{
 | 
			
		||||
		pool: pool,
 | 
			
		||||
	}}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Contains does a membership check on the repository blob set in redis. This
 | 
			
		||||
// is used as an access check before looking up global path information. If
 | 
			
		||||
// false is returned, the caller should still check the backend to if it
 | 
			
		||||
// exists elsewhere.
 | 
			
		||||
func (rlic *redisLayerInfoCache) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) {
 | 
			
		||||
	conn := rlic.pool.Get()
 | 
			
		||||
	defer conn.Close()
 | 
			
		||||
 | 
			
		||||
	ctxu.GetLogger(ctx).Debugf("(*redisLayerInfoCache).Contains(%q, %q)", repo, dgst)
 | 
			
		||||
	return redis.Bool(conn.Do("SISMEMBER", rlic.repositoryBlobSetKey(repo), dgst))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add adds the layer to the redis repository blob set.
 | 
			
		||||
func (rlic *redisLayerInfoCache) Add(ctx context.Context, repo string, dgst digest.Digest) error {
 | 
			
		||||
	conn := rlic.pool.Get()
 | 
			
		||||
	defer conn.Close()
 | 
			
		||||
 | 
			
		||||
	ctxu.GetLogger(ctx).Debugf("(*redisLayerInfoCache).Add(%q, %q)", repo, dgst)
 | 
			
		||||
	_, err := conn.Do("SADD", rlic.repositoryBlobSetKey(repo), dgst)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Meta retrieves the layer meta data from the redis hash, returning
 | 
			
		||||
// ErrUnknownLayer if not found.
 | 
			
		||||
func (rlic *redisLayerInfoCache) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) {
 | 
			
		||||
	conn := rlic.pool.Get()
 | 
			
		||||
	defer conn.Close()
 | 
			
		||||
 | 
			
		||||
	reply, err := redis.Values(conn.Do("HMGET", rlic.blobMetaHashKey(dgst), "path", "length"))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return LayerMeta{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(reply) < 2 || reply[0] == nil || reply[1] == nil {
 | 
			
		||||
		return LayerMeta{}, ErrNotFound
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var meta LayerMeta
 | 
			
		||||
	if _, err := redis.Scan(reply, &meta.Path, &meta.Length); err != nil {
 | 
			
		||||
		return LayerMeta{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return meta, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetMeta sets the meta data for the given digest using a redis hash. A hash
 | 
			
		||||
// is used here since we may store unrelated fields about a layer in the
 | 
			
		||||
// future.
 | 
			
		||||
func (rlic *redisLayerInfoCache) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error {
 | 
			
		||||
	conn := rlic.pool.Get()
 | 
			
		||||
	defer conn.Close()
 | 
			
		||||
 | 
			
		||||
	_, err := conn.Do("HMSET", rlic.blobMetaHashKey(dgst), "path", meta.Path, "length", meta.Length)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// repositoryBlobSetKey returns the key for the blob set in the cache.
 | 
			
		||||
func (rlic *redisLayerInfoCache) repositoryBlobSetKey(repo string) string {
 | 
			
		||||
	return "repository::" + repo + "::blobs"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// blobMetaHashKey returns the cache key for immutable blob meta data.
 | 
			
		||||
func (rlic *redisLayerInfoCache) blobMetaHashKey(dgst digest.Digest) string {
 | 
			
		||||
	return "blobs::" + dgst.String()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,50 @@
 | 
			
		|||
package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"flag"
 | 
			
		||||
	"os"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/garyburd/redigo/redis"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var redisAddr string
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	flag.StringVar(&redisAddr, "test.registry.storage.cache.redis.addr", "", "configure the address of a test instance of redis")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestRedisLayerInfoCache exercises a live redis instance using the cache
 | 
			
		||||
// implementation.
 | 
			
		||||
func TestRedisLayerInfoCache(t *testing.T) {
 | 
			
		||||
	if redisAddr == "" {
 | 
			
		||||
		// fallback to an environement variable
 | 
			
		||||
		redisAddr = os.Getenv("TEST_REGISTRY_STORAGE_CACHE_REDIS_ADDR")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if redisAddr == "" {
 | 
			
		||||
		// skip if still not set
 | 
			
		||||
		t.Skip("please set -registry.storage.cache.redis to test layer info cache against redis")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pool := &redis.Pool{
 | 
			
		||||
		Dial: func() (redis.Conn, error) {
 | 
			
		||||
			return redis.Dial("tcp", redisAddr)
 | 
			
		||||
		},
 | 
			
		||||
		MaxIdle:   1,
 | 
			
		||||
		MaxActive: 2,
 | 
			
		||||
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
 | 
			
		||||
			_, err := c.Do("PING")
 | 
			
		||||
			return err
 | 
			
		||||
		},
 | 
			
		||||
		Wait: false, // if a connection is not avialable, proceed without cache.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Clear the database
 | 
			
		||||
	if _, err := pool.Get().Do("FLUSHDB"); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error flushing redis db: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	checkLayerInfoCache(t, NewRedisLayerInfoCache(pool))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -27,8 +27,8 @@ type fileReader struct {
 | 
			
		|||
 | 
			
		||||
	// identifying fields
 | 
			
		||||
	path    string
 | 
			
		||||
	size    int64 // size is the total layer size, must be set.
 | 
			
		||||
	modtime time.Time
 | 
			
		||||
	size    int64     // size is the total size, must be set.
 | 
			
		||||
	modtime time.Time // TODO(stevvooe): This is not needed anymore.
 | 
			
		||||
 | 
			
		||||
	// mutable fields
 | 
			
		||||
	rc     io.ReadCloser // remote read closer
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,6 +11,7 @@ import (
 | 
			
		|||
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/cache"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
			
		||||
	"github.com/docker/distribution/testutil"
 | 
			
		||||
| 
						 | 
				
			
			@ -35,7 +36,7 @@ func TestSimpleLayerUpload(t *testing.T) {
 | 
			
		|||
	ctx := context.Background()
 | 
			
		||||
	imageName := "foo/bar"
 | 
			
		||||
	driver := inmemory.New()
 | 
			
		||||
	registry := NewRegistryWithDriver(driver)
 | 
			
		||||
	registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
 | 
			
		||||
	repository, err := registry.Repository(ctx, imageName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error getting repo: %v", err)
 | 
			
		||||
| 
						 | 
				
			
			@ -143,7 +144,7 @@ func TestSimpleLayerRead(t *testing.T) {
 | 
			
		|||
	ctx := context.Background()
 | 
			
		||||
	imageName := "foo/bar"
 | 
			
		||||
	driver := inmemory.New()
 | 
			
		||||
	registry := NewRegistryWithDriver(driver)
 | 
			
		||||
	registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
 | 
			
		||||
	repository, err := registry.Repository(ctx, imageName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error getting repo: %v", err)
 | 
			
		||||
| 
						 | 
				
			
			@ -180,7 +181,7 @@ func TestSimpleLayerRead(t *testing.T) {
 | 
			
		|||
		t.Fatalf("unexpected error fetching non-existent layer: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader)
 | 
			
		||||
	randomLayerDigest, err := writeTestLayer(driver, defaultPathMapper, imageName, dgst, randomLayerReader)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error writing test layer: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -252,7 +253,7 @@ func TestLayerUploadZeroLength(t *testing.T) {
 | 
			
		|||
	ctx := context.Background()
 | 
			
		||||
	imageName := "foo/bar"
 | 
			
		||||
	driver := inmemory.New()
 | 
			
		||||
	registry := NewRegistryWithDriver(driver)
 | 
			
		||||
	registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
 | 
			
		||||
	repository, err := registry.Repository(ctx, imageName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error getting repo: %v", err)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,202 @@
 | 
			
		|||
package storage
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"expvar"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	ctxu "github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/cache"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// cachedLayerService implements the layer service with path-aware caching,
 | 
			
		||||
// using a LayerInfoCache interface.
 | 
			
		||||
type cachedLayerService struct {
 | 
			
		||||
	distribution.LayerService // upstream layer service
 | 
			
		||||
	repository                distribution.Repository
 | 
			
		||||
	ctx                       context.Context
 | 
			
		||||
	driver                    driver.StorageDriver
 | 
			
		||||
	*blobStore                // global blob store
 | 
			
		||||
	cache                     cache.LayerInfoCache
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Exists checks for existence of the digest in the cache, immediately
 | 
			
		||||
// returning if it exists for the repository. If not, the upstream is checked.
 | 
			
		||||
// When a positive result is found, it is written into the cache.
 | 
			
		||||
func (lc *cachedLayerService) Exists(dgst digest.Digest) (bool, error) {
 | 
			
		||||
	ctxu.GetLogger(lc.ctx).Debugf("(*cachedLayerService).Exists(%q)", dgst)
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		// TODO(stevvooe): Replace this with a decent context-based metrics solution
 | 
			
		||||
		ctxu.GetLoggerWithField(lc.ctx, "blob.exists.duration", time.Since(now)).
 | 
			
		||||
			Infof("(*cachedLayerService).Exists(%q)", dgst)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	atomic.AddUint64(&layerInfoCacheMetrics.Exists.Requests, 1)
 | 
			
		||||
	available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
 | 
			
		||||
		goto fallback
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if available {
 | 
			
		||||
		atomic.AddUint64(&layerInfoCacheMetrics.Exists.Hits, 1)
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
fallback:
 | 
			
		||||
	atomic.AddUint64(&layerInfoCacheMetrics.Exists.Misses, 1)
 | 
			
		||||
	exists, err := lc.LayerService.Exists(dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return exists, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if exists {
 | 
			
		||||
		// we can only cache this if the existence is positive.
 | 
			
		||||
		if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
 | 
			
		||||
			ctxu.GetLogger(lc.ctx).Errorf("error adding %v@%v to cache: %v", lc.repository.Name(), dgst, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return exists, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Fetch checks for the availability of the layer in the repository via the
 | 
			
		||||
// cache. If present, the metadata is resolved and the layer is returned. If
 | 
			
		||||
// any operation fails, the layer is read directly from the upstream. The
 | 
			
		||||
// results are cached, if possible.
 | 
			
		||||
func (lc *cachedLayerService) Fetch(dgst digest.Digest) (distribution.Layer, error) {
 | 
			
		||||
	ctxu.GetLogger(lc.ctx).Debugf("(*layerInfoCache).Fetch(%q)", dgst)
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		ctxu.GetLoggerWithField(lc.ctx, "blob.fetch.duration", time.Since(now)).
 | 
			
		||||
			Infof("(*layerInfoCache).Fetch(%q)", dgst)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Requests, 1)
 | 
			
		||||
	available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err)
 | 
			
		||||
		goto fallback
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if available {
 | 
			
		||||
		// fast path: get the layer info and return
 | 
			
		||||
		meta, err := lc.cache.Meta(lc.ctx, dgst)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			ctxu.GetLogger(lc.ctx).Errorf("error fetching %v@%v from cache: %v", lc.repository.Name(), dgst, err)
 | 
			
		||||
			goto fallback
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Hits, 1)
 | 
			
		||||
		return newLayerReader(lc.driver, dgst, meta.Path, meta.Length)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// NOTE(stevvooe): Unfortunately, the cache here only makes checks for
 | 
			
		||||
	// existing layers faster. We'd have to provide more careful
 | 
			
		||||
	// synchronization with the backend to make the missing case as fast.
 | 
			
		||||
 | 
			
		||||
fallback:
 | 
			
		||||
	atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Misses, 1)
 | 
			
		||||
	layer, err := lc.LayerService.Fetch(dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// add the layer to the repository
 | 
			
		||||
	if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil {
 | 
			
		||||
		ctxu.GetLogger(lc.ctx).
 | 
			
		||||
			Errorf("error caching repository relationship for %v@%v: %v", lc.repository.Name(), dgst, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// lookup layer path and add it to the cache, if it succeds. Note that we
 | 
			
		||||
	// still return the layer even if we have trouble caching it.
 | 
			
		||||
	if path, err := lc.resolveLayerPath(layer); err != nil {
 | 
			
		||||
		ctxu.GetLogger(lc.ctx).
 | 
			
		||||
			Errorf("error resolving path while caching %v@%v: %v", lc.repository.Name(), dgst, err)
 | 
			
		||||
	} else {
 | 
			
		||||
		// add the layer to the cache once we've resolved the path.
 | 
			
		||||
		if err := lc.cache.SetMeta(lc.ctx, dgst, cache.LayerMeta{Path: path, Length: layer.Length()}); err != nil {
 | 
			
		||||
			ctxu.GetLogger(lc.ctx).Errorf("error adding meta for %v@%v to cache: %v", lc.repository.Name(), dgst, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return layer, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// extractLayerInfo pulls the layerInfo from the layer, attempting to get the
 | 
			
		||||
// path information from either the concrete object or by resolving the
 | 
			
		||||
// primary blob store path.
 | 
			
		||||
func (lc *cachedLayerService) resolveLayerPath(layer distribution.Layer) (path string, err error) {
 | 
			
		||||
	// try and resolve the type and driver, so we don't have to traverse links
 | 
			
		||||
	switch v := layer.(type) {
 | 
			
		||||
	case *layerReader:
 | 
			
		||||
		// only set path if we have same driver instance.
 | 
			
		||||
		if v.driver == lc.driver {
 | 
			
		||||
			return v.path, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctxu.GetLogger(lc.ctx).Warnf("resolving layer path during cache lookup (%v@%v)", lc.repository.Name(), layer.Digest())
 | 
			
		||||
	// we have to do an expensive stat to resolve the layer location but no
 | 
			
		||||
	// need to check the link, since we already have layer instance for this
 | 
			
		||||
	// repository.
 | 
			
		||||
	bp, err := lc.blobStore.path(layer.Digest())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return bp, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// layerInfoCacheMetrics keeps track of cache metrics for layer info cache
 | 
			
		||||
// requests. Note this is kept globally and made available via expvar. For
 | 
			
		||||
// more detailed metrics, its recommend to instrument a particular cache
 | 
			
		||||
// implementation.
 | 
			
		||||
var layerInfoCacheMetrics struct {
 | 
			
		||||
	// Exists tracks calls to the Exists caches.
 | 
			
		||||
	Exists struct {
 | 
			
		||||
		Requests uint64
 | 
			
		||||
		Hits     uint64
 | 
			
		||||
		Misses   uint64
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Fetch tracks calls to the fetch caches.
 | 
			
		||||
	Fetch struct {
 | 
			
		||||
		Requests uint64
 | 
			
		||||
		Hits     uint64
 | 
			
		||||
		Misses   uint64
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	registry := expvar.Get("registry")
 | 
			
		||||
	if registry == nil {
 | 
			
		||||
		registry = expvar.NewMap("registry")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cache := registry.(*expvar.Map).Get("cache")
 | 
			
		||||
	if cache == nil {
 | 
			
		||||
		cache = &expvar.Map{}
 | 
			
		||||
		cache.(*expvar.Map).Init()
 | 
			
		||||
		registry.(*expvar.Map).Set("cache", cache)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	storage := cache.(*expvar.Map).Get("storage")
 | 
			
		||||
	if storage == nil {
 | 
			
		||||
		storage = &expvar.Map{}
 | 
			
		||||
		storage.(*expvar.Map).Init()
 | 
			
		||||
		cache.(*expvar.Map).Set("storage", storage)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	storage.(*expvar.Map).Set("layerinfo", expvar.Func(func() interface{} {
 | 
			
		||||
		// no need for synchronous access: the increments are atomic and
 | 
			
		||||
		// during reading, we don't care if the data is up to date. The
 | 
			
		||||
		// numbers will always *eventually* be reported correctly.
 | 
			
		||||
		return layerInfoCacheMetrics
 | 
			
		||||
	}))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -17,6 +17,21 @@ type layerReader struct {
 | 
			
		|||
	digest digest.Digest
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newLayerReader returns a new layerReader with the digest, path and length,
 | 
			
		||||
// eliding round trips to the storage backend.
 | 
			
		||||
func newLayerReader(driver driver.StorageDriver, dgst digest.Digest, path string, length int64) (*layerReader, error) {
 | 
			
		||||
	fr := &fileReader{
 | 
			
		||||
		driver: driver,
 | 
			
		||||
		path:   path,
 | 
			
		||||
		size:   length,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &layerReader{
 | 
			
		||||
		fileReader: *fr,
 | 
			
		||||
		digest:     dgst,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ distribution.Layer = &layerReader{}
 | 
			
		||||
 | 
			
		||||
func (lr *layerReader) Digest() digest.Digest {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,6 +6,8 @@ import (
 | 
			
		|||
	"reflect"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/cache"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/manifest"
 | 
			
		||||
| 
						 | 
				
			
			@ -28,7 +30,7 @@ type manifestStoreTestEnv struct {
 | 
			
		|||
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	driver := inmemory.New()
 | 
			
		||||
	registry := NewRegistryWithDriver(driver)
 | 
			
		||||
	registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
 | 
			
		||||
 | 
			
		||||
	repo, err := registry.Repository(ctx, name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,6 +3,7 @@ package storage
 | 
			
		|||
import (
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/registry/api/v2"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/cache"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -10,28 +11,29 @@ import (
 | 
			
		|||
// registry is the top-level implementation of Registry for use in the storage
 | 
			
		||||
// package. All instances should descend from this object.
 | 
			
		||||
type registry struct {
 | 
			
		||||
	driver    storagedriver.StorageDriver
 | 
			
		||||
	pm        *pathMapper
 | 
			
		||||
	blobStore *blobStore
 | 
			
		||||
	driver         storagedriver.StorageDriver
 | 
			
		||||
	pm             *pathMapper
 | 
			
		||||
	blobStore      *blobStore
 | 
			
		||||
	layerInfoCache cache.LayerInfoCache
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRegistryWithDriver creates a new registry instance from the provided
 | 
			
		||||
// driver. The resulting registry may be shared by multiple goroutines but is
 | 
			
		||||
// cheap to allocate.
 | 
			
		||||
func NewRegistryWithDriver(driver storagedriver.StorageDriver) distribution.Registry {
 | 
			
		||||
	bs := &blobStore{}
 | 
			
		||||
func NewRegistryWithDriver(driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Registry {
 | 
			
		||||
	bs := &blobStore{
 | 
			
		||||
		driver: driver,
 | 
			
		||||
		pm:     defaultPathMapper,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	reg := ®istry{
 | 
			
		||||
	return ®istry{
 | 
			
		||||
		driver:    driver,
 | 
			
		||||
		blobStore: bs,
 | 
			
		||||
 | 
			
		||||
		// TODO(sday): This should be configurable.
 | 
			
		||||
		pm: defaultPathMapper,
 | 
			
		||||
		pm:             defaultPathMapper,
 | 
			
		||||
		layerInfoCache: layerInfoCache,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	reg.blobStore.registry = reg
 | 
			
		||||
 | 
			
		||||
	return reg
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Repository returns an instance of the repository tied to the registry.
 | 
			
		||||
| 
						 | 
				
			
			@ -83,9 +85,29 @@ func (repo *repository) Manifests() distribution.ManifestService {
 | 
			
		|||
// may be context sensitive in the future. The instance should be used similar
 | 
			
		||||
// to a request local.
 | 
			
		||||
func (repo *repository) Layers() distribution.LayerService {
 | 
			
		||||
	return &layerStore{
 | 
			
		||||
	ls := &layerStore{
 | 
			
		||||
		repository: repo,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if repo.registry.layerInfoCache != nil {
 | 
			
		||||
		// TODO(stevvooe): This is not the best place to setup a cache. We would
 | 
			
		||||
		// really like to decouple the cache from the backend but also have the
 | 
			
		||||
		// manifeset service use the layer service cache. For now, we can simply
 | 
			
		||||
		// integrate the cache directly. The main issue is that we have layer
 | 
			
		||||
		// access and layer data coupled in a single object. Work is already under
 | 
			
		||||
		// way to decouple this.
 | 
			
		||||
 | 
			
		||||
		return &cachedLayerService{
 | 
			
		||||
			LayerService: ls,
 | 
			
		||||
			repository:   repo,
 | 
			
		||||
			ctx:          repo.ctx,
 | 
			
		||||
			driver:       repo.driver,
 | 
			
		||||
			blobStore:    repo.blobStore,
 | 
			
		||||
			cache:        repo.registry.layerInfoCache,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ls
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (repo *repository) Signatures() distribution.SignatureService {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue