Refactoring cloudfactory layer handler into a more generic storage
middleware concept. This also breaks the dependency the storage package had on goamz Signed-off-by: David Lawrence <david.lawrence@docker.com> (github: endophage)master
							parent
							
								
									0c130dff5b
								
							
						
					
					
						commit
						4acda57e05
					
				|  | @ -17,6 +17,7 @@ import ( | |||
| 	"github.com/docker/distribution/registry/handlers" | ||||
| 	_ "github.com/docker/distribution/registry/storage/driver/filesystem" | ||||
| 	_ "github.com/docker/distribution/registry/storage/driver/inmemory" | ||||
| 	_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" | ||||
| 	_ "github.com/docker/distribution/registry/storage/driver/s3" | ||||
| 	"github.com/docker/distribution/version" | ||||
| 	gorhandlers "github.com/gorilla/handlers" | ||||
|  |  | |||
|  | @ -26,8 +26,8 @@ type Configuration struct { | |||
| 	// used to gate requests.
 | ||||
| 	Auth Auth `yaml:"auth,omitempty"` | ||||
| 
 | ||||
| 	// LayerHandler specifies a middleware for serving image layers.
 | ||||
| 	LayerHandler LayerHandler `yaml:"layerhandler,omitempty"` | ||||
| 	// Middleware lists all middlewares to be used by the registry.
 | ||||
| 	Middleware []Middleware `yaml:"middleware,omitempty"` | ||||
| 
 | ||||
| 	// Reporting is the configuration for error reporting
 | ||||
| 	Reporting Reporting `yaml:"reporting,omitempty"` | ||||
|  | @ -295,60 +295,18 @@ type NewRelicReporting struct { | |||
| 	Name string `yaml:"name,omitempty"` | ||||
| } | ||||
| 
 | ||||
| // LayerHandler defines the configuration for middleware layer serving
 | ||||
| type LayerHandler map[string]Parameters | ||||
| 
 | ||||
| // Type returns the layerhandler type
 | ||||
| func (layerHandler LayerHandler) Type() string { | ||||
| 	// Return only key in this map
 | ||||
| 	for k := range layerHandler { | ||||
| 		return k | ||||
| 	} | ||||
| 	return "" | ||||
| } | ||||
| 
 | ||||
| // Parameters returns the Parameters map for a LayerHandler configuration
 | ||||
| func (layerHandler LayerHandler) Parameters() Parameters { | ||||
| 	return layerHandler[layerHandler.Type()] | ||||
| } | ||||
| 
 | ||||
| // UnmarshalYAML implements the yaml.Unmarshaler interface
 | ||||
| // Unmarshals a single item map into a Storage or a string into a Storage type with no parameters
 | ||||
| func (layerHandler *LayerHandler) UnmarshalYAML(unmarshal func(interface{}) error) error { | ||||
| 	var storageMap map[string]Parameters | ||||
| 	err := unmarshal(&storageMap) | ||||
| 	if err == nil { | ||||
| 		if len(storageMap) > 1 { | ||||
| 			types := make([]string, 0, len(storageMap)) | ||||
| 			for k := range storageMap { | ||||
| 				types = append(types, k) | ||||
| 			} | ||||
| 			return fmt.Errorf("Must provide exactly one layerhandler type. Provided: %v", types) | ||||
| 		} | ||||
| 		*layerHandler = storageMap | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	var storageType string | ||||
| 	err = unmarshal(&storageType) | ||||
| 	if err == nil { | ||||
| 		*layerHandler = LayerHandler{storageType: Parameters{}} | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // MarshalYAML implements the yaml.Marshaler interface
 | ||||
| func (layerHandler LayerHandler) MarshalYAML() (interface{}, error) { | ||||
| 	if layerHandler.Parameters() == nil { | ||||
| 		t := layerHandler.Type() | ||||
| 		if t == "" { | ||||
| 			return nil, nil | ||||
| 		} | ||||
| 		return t, nil | ||||
| 	} | ||||
| 	return map[string]Parameters(layerHandler), nil | ||||
| // Middleware configures named middlewares to be applied at injection points.
 | ||||
| type Middleware struct { | ||||
| 	// Name the middleware registers itself as
 | ||||
| 	Name string `yaml:"name"` | ||||
| 	// Injection point the middleware should be applied at
 | ||||
| 	// N.B. ensure the middleware is applicable for the named injection point, middlewares
 | ||||
| 	//      for different injection points are not interchangeable.
 | ||||
| 	Inject string `yaml:"inject"` | ||||
| 	// Flag to disable middleware easily
 | ||||
| 	Disabled bool `yaml:"Disabled,omitempty"` | ||||
| 	// Map of parameters that will be passed to the middleware's initialization function
 | ||||
| 	Options Parameters `yaml:"options"` | ||||
| } | ||||
| 
 | ||||
| // Parse parses an input configuration yaml document into a Configuration struct
 | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ package distribution | |||
| 
 | ||||
| import ( | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/docker/distribution/digest" | ||||
|  | @ -106,6 +107,11 @@ type Layer interface { | |||
| 
 | ||||
| 	// CreatedAt returns the time this layer was created.
 | ||||
| 	CreatedAt() time.Time | ||||
| 
 | ||||
| 	// ServeHTTP allows a layer to serve itself, whether by providing
 | ||||
| 	// a redirect directly to the content, or by serving the content
 | ||||
| 	// itself
 | ||||
| 	ServeHTTP(w http.ResponseWriter, r *http.Request) | ||||
| } | ||||
| 
 | ||||
| // LayerUpload provides a handle for working with in-progress uploads.
 | ||||
|  |  | |||
|  | @ -16,6 +16,7 @@ import ( | |||
| 	"github.com/docker/distribution/registry/storage" | ||||
| 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||
| 	"github.com/docker/distribution/registry/storage/driver/factory" | ||||
| 	storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" | ||||
| 	"github.com/gorilla/mux" | ||||
| 	"golang.org/x/net/context" | ||||
| ) | ||||
|  | @ -41,8 +42,6 @@ type App struct { | |||
| 		sink   notifications.Sink | ||||
| 		source notifications.SourceRecord | ||||
| 	} | ||||
| 
 | ||||
| 	layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider
 | ||||
| } | ||||
| 
 | ||||
| // Value intercepts calls context.Context.Value, returning the current app id,
 | ||||
|  | @ -101,14 +100,22 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App | |||
| 		app.accessController = accessController | ||||
| 	} | ||||
| 
 | ||||
| 	layerHandlerType := configuration.LayerHandler.Type() | ||||
| 	for _, mw := range configuration.Middleware { | ||||
| 		if mw.Inject == "registry" { | ||||
| 			// registry middleware can director wrap app.registry identically to storage middlewares with driver
 | ||||
| 			panic(fmt.Sprintf("unable to configure registry middleware (%s): %v", mw.Name, err)) | ||||
| 		} else if mw.Inject == "repository" { | ||||
| 			// we have to do something more intelligent with repository middleware, It needs to be staged
 | ||||
| 			// for later to be wrapped around the repository at request time.
 | ||||
| 			panic(fmt.Sprintf("unable to configure repository middleware (%s): %v", mw.Name, err)) | ||||
| 		} else if mw.Inject == "storage" { | ||||
| 			smw, err := storagemiddleware.GetStorageMiddleware(mw.Name, mw.Options, app.driver) | ||||
| 
 | ||||
| 	if layerHandlerType != "" { | ||||
| 		lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver) | ||||
| 			if err != nil { | ||||
| 			panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err)) | ||||
| 				panic(fmt.Sprintf("unable to configure storage middleware (%s): %v", mw.Name, err)) | ||||
| 			} | ||||
| 			app.driver = smw | ||||
| 		} | ||||
| 		app.layerHandler = lh | ||||
| 	} | ||||
| 
 | ||||
| 	return app | ||||
|  |  | |||
|  | @ -49,8 +49,8 @@ type layerHandler struct { | |||
| // response.
 | ||||
| func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { | ||||
| 	ctxu.GetLogger(lh).Debug("GetImageLayer") | ||||
| 	layers := lh.Repository.Layers() | ||||
| 	layer, err := layers.Fetch(lh.Digest) | ||||
| 	layerStore := lh.Repository.Layers() | ||||
| 	layerReader, err := layerStore.Fetch(lh.Digest) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		switch err := err.(type) { | ||||
|  | @ -62,17 +62,6 @@ func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { | |||
| 		} | ||||
| 		return | ||||
| 	} | ||||
| 	defer layer.Close() | ||||
| 
 | ||||
| 	w.Header().Set("Docker-Content-Digest", lh.Digest.String()) | ||||
| 
 | ||||
| 	if lh.layerHandler != nil { | ||||
| 		handler, _ := lh.layerHandler.Resolve(layer) | ||||
| 		if handler != nil { | ||||
| 			handler.ServeHTTP(w, r) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer) | ||||
| 	layerReader.ServeHTTP(w, r) | ||||
| } | ||||
|  |  | |||
|  | @ -1,95 +0,0 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||
| ) | ||||
| 
 | ||||
| // delegateLayerHandler provides a simple implementation of layerHandler that
 | ||||
| // simply issues HTTP Temporary Redirects to the URL provided by the
 | ||||
| // storagedriver for a given Layer.
 | ||||
| type delegateLayerHandler struct { | ||||
| 	storageDriver storagedriver.StorageDriver | ||||
| 	pathMapper    *pathMapper | ||||
| 	duration      time.Duration | ||||
| } | ||||
| 
 | ||||
| var _ LayerHandler = &delegateLayerHandler{} | ||||
| 
 | ||||
| func newDelegateLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) { | ||||
| 	duration := 20 * time.Minute | ||||
| 	d, ok := options["duration"] | ||||
| 	if ok { | ||||
| 		switch d := d.(type) { | ||||
| 		case time.Duration: | ||||
| 			duration = d | ||||
| 		case string: | ||||
| 			dur, err := time.ParseDuration(d) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("Invalid duration: %s", err) | ||||
| 			} | ||||
| 			duration = dur | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return &delegateLayerHandler{storageDriver: storageDriver, pathMapper: defaultPathMapper, duration: duration}, nil | ||||
| } | ||||
| 
 | ||||
| // Resolve returns an http.Handler which can serve the contents of the given
 | ||||
| // Layer, or an error if not supported by the storagedriver.
 | ||||
| func (lh *delegateLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) { | ||||
| 	// TODO(bbland): This is just a sanity check to ensure that the
 | ||||
| 	// storagedriver supports url generation. It would be nice if we didn't have
 | ||||
| 	// to do this twice for non-GET requests.
 | ||||
| 	layerURL, err := lh.urlFor(layer, map[string]interface{}{"method": "GET"}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		if r.Method != "GET" { | ||||
| 			layerURL, err = lh.urlFor(layer, map[string]interface{}{"method": r.Method}) | ||||
| 			if err != nil { | ||||
| 				http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		http.Redirect(w, r, layerURL, http.StatusTemporaryRedirect) | ||||
| 	}), nil | ||||
| } | ||||
| 
 | ||||
| // urlFor returns a download URL for the given layer, or the empty string if
 | ||||
| // unsupported.
 | ||||
| func (lh *delegateLayerHandler) urlFor(layer distribution.Layer, options map[string]interface{}) (string, error) { | ||||
| 	// Crack open the layer to get at the layerStore
 | ||||
| 	layerRd, ok := layer.(*layerReader) | ||||
| 	if !ok { | ||||
| 		// TODO(stevvooe): We probably want to find a better way to get at the
 | ||||
| 		// underlying filesystem path for a given layer. Perhaps, the layer
 | ||||
| 		// handler should have its own layer store but right now, it is not
 | ||||
| 		// request scoped.
 | ||||
| 		return "", fmt.Errorf("unsupported layer type: cannot resolve blob path: %v", layer) | ||||
| 	} | ||||
| 
 | ||||
| 	if options == nil { | ||||
| 		options = make(map[string]interface{}) | ||||
| 	} | ||||
| 	options["expiry"] = time.Now().Add(lh.duration) | ||||
| 
 | ||||
| 	layerURL, err := lh.storageDriver.URLFor(layerRd.path, options) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return layerURL, nil | ||||
| } | ||||
| 
 | ||||
| // init registers the delegate layerHandler backend.
 | ||||
| func init() { | ||||
| 	RegisterLayerHandler("delegate", LayerHandlerInitFunc(newDelegateLayerHandler)) | ||||
| } | ||||
|  | @ -1,34 +1,36 @@ | |||
| package storage | ||||
| // Package middleware - cloudfront wrapper for storage libs
 | ||||
| // N.B. currently only works with S3, not arbitrary sites
 | ||||
| //
 | ||||
| package middleware | ||||
| 
 | ||||
| import ( | ||||
| 	"crypto/x509" | ||||
| 	"encoding/pem" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/AdRoll/goamz/cloudfront" | ||||
| 	"github.com/docker/distribution" | ||||
| 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||
| 	storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" | ||||
| ) | ||||
| 
 | ||||
| // cloudFrontLayerHandler provides an simple implementation of layerHandler that
 | ||||
| // cloudFrontStorageMiddleware provides an simple implementation of layerHandler that
 | ||||
| // constructs temporary signed CloudFront URLs from the storagedriver layer URL,
 | ||||
| // then issues HTTP Temporary Redirects to this CloudFront content URL.
 | ||||
| type cloudFrontLayerHandler struct { | ||||
| type cloudFrontStorageMiddleware struct { | ||||
| 	storagedriver.StorageDriver | ||||
| 	cloudfront *cloudfront.CloudFront | ||||
| 	delegateLayerHandler *delegateLayerHandler | ||||
| 	duration   time.Duration | ||||
| } | ||||
| 
 | ||||
| var _ LayerHandler = &cloudFrontLayerHandler{} | ||||
| var _ storagedriver.StorageDriver = &cloudFrontStorageMiddleware{} | ||||
| 
 | ||||
| // newCloudFrontLayerHandler constructs and returns a new CloudFront
 | ||||
| // LayerHandler implementation.
 | ||||
| // Required options: baseurl, privatekey, keypairid
 | ||||
| func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) { | ||||
| func newCloudFrontStorageMiddleware(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) { | ||||
| 	base, ok := options["baseurl"] | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("No baseurl provided") | ||||
|  | @ -68,12 +70,6 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option | |||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	lh, err := newDelegateLayerHandler(storageDriver, options) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	dlh := lh.(*delegateLayerHandler) | ||||
| 
 | ||||
| 	cf := cloudfront.New(baseURL, privateKey, keypairID) | ||||
| 
 | ||||
| 	duration := 20 * time.Minute | ||||
|  | @ -91,33 +87,33 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return &cloudFrontLayerHandler{cloudfront: cf, delegateLayerHandler: dlh, duration: duration}, nil | ||||
| 	return &cloudFrontStorageMiddleware{StorageDriver: storageDriver, cloudfront: cf, duration: duration}, nil | ||||
| } | ||||
| 
 | ||||
| // Resolve returns an http.Handler which can serve the contents of the given
 | ||||
| // Layer, or an error if not supported by the storagedriver.
 | ||||
| func (lh *cloudFrontLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) { | ||||
| 	layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil) | ||||
| func (lh *cloudFrontStorageMiddleware) URLFor(path string, options map[string]interface{}) (string, error) { | ||||
| 	// TODO(endophage): currently only supports S3
 | ||||
| 	options["expiry"] = time.Now().Add(lh.duration) | ||||
| 
 | ||||
| 	layerURLStr, err := lh.StorageDriver.URLFor(path, options) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	layerURL, err := url.Parse(layerURLStr) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		http.Redirect(w, r, cfURL, http.StatusTemporaryRedirect) | ||||
| 	}), nil | ||||
| 	return cfURL, nil | ||||
| } | ||||
| 
 | ||||
| // init registers the cloudfront layerHandler backend.
 | ||||
| func init() { | ||||
| 	RegisterLayerHandler("cloudfront", LayerHandlerInitFunc(newCloudFrontLayerHandler)) | ||||
| 	storagemiddleware.RegisterStorageMiddleware("cloudfront", storagemiddleware.InitFunc(newCloudFrontStorageMiddleware)) | ||||
| } | ||||
|  | @ -0,0 +1,40 @@ | |||
| package storagemiddleware | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 
 | ||||
| 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||
| ) | ||||
| 
 | ||||
| // InitFunc is the type of a StorageMiddleware factory function and is
 | ||||
| // used to register the contsructor for different StorageMiddleware backends.
 | ||||
| type InitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) | ||||
| 
 | ||||
| var storageMiddlewares map[string]InitFunc | ||||
| 
 | ||||
| // RegisterStorageMiddleware is used to register an StorageMiddlewareInitFunc for
 | ||||
| // a StorageMiddleware backend with the given name.
 | ||||
| func RegisterStorageMiddleware(name string, initFunc InitFunc) error { | ||||
| 	if storageMiddlewares == nil { | ||||
| 		storageMiddlewares = make(map[string]InitFunc) | ||||
| 	} | ||||
| 	if _, exists := storageMiddlewares[name]; exists { | ||||
| 		return fmt.Errorf("name already registered: %s", name) | ||||
| 	} | ||||
| 
 | ||||
| 	storageMiddlewares[name] = initFunc | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // GetStorageMiddleware constructs a StorageMiddleware
 | ||||
| // with the given options using the named backend.
 | ||||
| func GetStorageMiddleware(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (storagedriver.StorageDriver, error) { | ||||
| 	if storageMiddlewares != nil { | ||||
| 		if initFunc, exists := storageMiddlewares[name]; exists { | ||||
| 			return initFunc(storageDriver, options) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil, fmt.Errorf("no storage middleware registered with name: %s", name) | ||||
| } | ||||
|  | @ -1,51 +0,0 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||
| ) | ||||
| 
 | ||||
| // LayerHandler provides middleware for serving the contents of a Layer.
 | ||||
| type LayerHandler interface { | ||||
| 	// Resolve returns an http.Handler which can serve the contents of a given
 | ||||
| 	// Layer if possible, or nil and an error when unsupported. This may
 | ||||
| 	// directly serve the contents of the layer or issue a redirect to another
 | ||||
| 	// URL hosting the content.
 | ||||
| 	Resolve(layer distribution.Layer) (http.Handler, error) | ||||
| } | ||||
| 
 | ||||
| // LayerHandlerInitFunc is the type of a LayerHandler factory function and is
 | ||||
| // used to register the contsructor for different LayerHandler backends.
 | ||||
| type LayerHandlerInitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) | ||||
| 
 | ||||
| var layerHandlers map[string]LayerHandlerInitFunc | ||||
| 
 | ||||
| // RegisterLayerHandler is used to register an LayerHandlerInitFunc for
 | ||||
| // a LayerHandler backend with the given name.
 | ||||
| func RegisterLayerHandler(name string, initFunc LayerHandlerInitFunc) error { | ||||
| 	if layerHandlers == nil { | ||||
| 		layerHandlers = make(map[string]LayerHandlerInitFunc) | ||||
| 	} | ||||
| 	if _, exists := layerHandlers[name]; exists { | ||||
| 		return fmt.Errorf("name already registered: %s", name) | ||||
| 	} | ||||
| 
 | ||||
| 	layerHandlers[name] = initFunc | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // GetLayerHandler constructs a LayerHandler
 | ||||
| // with the given options using the named backend.
 | ||||
| func GetLayerHandler(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (LayerHandler, error) { | ||||
| 	if layerHandlers != nil { | ||||
| 		if initFunc, exists := layerHandlers[name]; exists { | ||||
| 			return initFunc(storageDriver, options) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil, fmt.Errorf("no layer handler registered with name: %s", name) | ||||
| } | ||||
|  | @ -1,13 +1,14 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	"github.com/docker/distribution/digest" | ||||
| ) | ||||
| 
 | ||||
| // layerReadSeeker implements Layer and provides facilities for reading and
 | ||||
| // LayerRead implements Layer and provides facilities for reading and
 | ||||
| // seeking.
 | ||||
| type layerReader struct { | ||||
| 	fileReader | ||||
|  | @ -17,6 +18,10 @@ type layerReader struct { | |||
| 
 | ||||
| var _ distribution.Layer = &layerReader{} | ||||
| 
 | ||||
| func (lrs *layerReader) Path() string { | ||||
| 	return lrs.path | ||||
| } | ||||
| 
 | ||||
| func (lrs *layerReader) Digest() digest.Digest { | ||||
| 	return lrs.digest | ||||
| } | ||||
|  | @ -33,3 +38,12 @@ func (lrs *layerReader) CreatedAt() time.Time { | |||
| func (lrs *layerReader) Close() error { | ||||
| 	return lrs.closeWithErr(distribution.ErrLayerClosed) | ||||
| } | ||||
| 
 | ||||
| func (lrs *layerReader) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	w.Header().Set("Docker-Content-Digest", lrs.digest.String()) | ||||
| 
 | ||||
| 	if url, err := lrs.fileReader.driver.URLFor(lrs.Path(), map[string]interface{}{}); err == nil { | ||||
| 		http.Redirect(w, r, url, http.StatusTemporaryRedirect) | ||||
| 	} | ||||
| 	http.ServeContent(w, r, lrs.Digest().String(), lrs.CreatedAt(), lrs) | ||||
| } | ||||
|  |  | |||
|  | @ -138,7 +138,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di | |||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return &layerUploadController{ | ||||
| 	return &layerWriter{ | ||||
| 		layerStore:         ls, | ||||
| 		uuid:               uuid, | ||||
| 		startedAt:          startedAt, | ||||
|  |  | |||
|  | @ -13,9 +13,11 @@ import ( | |||
| 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||
| ) | ||||
| 
 | ||||
| // layerUploadController is used to control the various aspects of resumable
 | ||||
| var _ distribution.LayerUpload = &layerWriter{} | ||||
| 
 | ||||
| // layerWriter is used to control the various aspects of resumable
 | ||||
| // layer upload. It implements the LayerUpload interface.
 | ||||
| type layerUploadController struct { | ||||
| type layerWriter struct { | ||||
| 	layerStore *layerStore | ||||
| 
 | ||||
| 	uuid      string | ||||
|  | @ -26,65 +28,64 @@ type layerUploadController struct { | |||
| 	bufferedFileWriter | ||||
| } | ||||
| 
 | ||||
| var _ distribution.LayerUpload = &layerUploadController{} | ||||
| var _ distribution.LayerUpload = &layerWriter{} | ||||
| 
 | ||||
| // UUID returns the identifier for this upload.
 | ||||
| func (luc *layerUploadController) UUID() string { | ||||
| 	return luc.uuid | ||||
| func (lw *layerWriter) UUID() string { | ||||
| 	return lw.uuid | ||||
| } | ||||
| 
 | ||||
| func (luc *layerUploadController) StartedAt() time.Time { | ||||
| 	return luc.startedAt | ||||
| func (lw *layerWriter) StartedAt() time.Time { | ||||
| 	return lw.startedAt | ||||
| } | ||||
| 
 | ||||
| // Finish marks the upload as completed, returning a valid handle to the
 | ||||
| // uploaded layer. The final size and checksum are validated against the
 | ||||
| // contents of the uploaded layer. The checksum should be provided in the
 | ||||
| // format <algorithm>:<hex digest>.
 | ||||
| func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) { | ||||
| 	ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish") | ||||
| func (lw *layerWriter) Finish(digest digest.Digest) (distribution.Layer, error) { | ||||
| 	ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish") | ||||
| 
 | ||||
| 	err := luc.bufferedFileWriter.Close() | ||||
| 	if err := lw.bufferedFileWriter.Close(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	canonical, err := lw.validateLayer(digest) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	canonical, err := luc.validateLayer(digest) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if err := luc.moveLayer(canonical); err != nil { | ||||
| 	if err := lw.moveLayer(canonical); err != nil { | ||||
| 		// TODO(stevvooe): Cleanup?
 | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	// Link the layer blob into the repository.
 | ||||
| 	if err := luc.linkLayer(canonical, digest); err != nil { | ||||
| 	if err := lw.linkLayer(canonical, digest); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if err := luc.removeResources(); err != nil { | ||||
| 	if err := lw.removeResources(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return luc.layerStore.Fetch(canonical) | ||||
| 	return lw.layerStore.Fetch(canonical) | ||||
| } | ||||
| 
 | ||||
| // Cancel the layer upload process.
 | ||||
| func (luc *layerUploadController) Cancel() error { | ||||
| 	ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Cancel") | ||||
| 	if err := luc.removeResources(); err != nil { | ||||
| func (lw *layerWriter) Cancel() error { | ||||
| 	ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel") | ||||
| 	if err := lw.removeResources(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	luc.Close() | ||||
| 	lw.Close() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // validateLayer checks the layer data against the digest, returning an error
 | ||||
| // if it does not match. The canonical digest is returned.
 | ||||
| func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) { | ||||
| func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) { | ||||
| 	digestVerifier, err := digest.NewDigestVerifier(dgst) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
|  | @ -96,7 +97,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige | |||
| 	// then only have to fetch the difference.
 | ||||
| 
 | ||||
| 	// Read the file from the backend driver and validate it.
 | ||||
| 	fr, err := newFileReader(luc.bufferedFileWriter.driver, luc.path) | ||||
| 	fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
|  | @ -125,8 +126,8 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige | |||
| // moveLayer moves the data into its final, hash-qualified destination,
 | ||||
| // identified by dgst. The layer should be validated before commencing the
 | ||||
| // move.
 | ||||
| func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { | ||||
| 	blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{ | ||||
| func (lw *layerWriter) moveLayer(dgst digest.Digest) error { | ||||
| 	blobPath, err := lw.layerStore.repository.registry.pm.path(blobDataPathSpec{ | ||||
| 		digest: dgst, | ||||
| 	}) | ||||
| 
 | ||||
|  | @ -135,7 +136,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { | |||
| 	} | ||||
| 
 | ||||
| 	// Check for existence
 | ||||
| 	if _, err := luc.driver.Stat(blobPath); err != nil { | ||||
| 	if _, err := lw.driver.Stat(blobPath); err != nil { | ||||
| 		switch err := err.(type) { | ||||
| 		case storagedriver.PathNotFoundError: | ||||
| 			break // ensure that it doesn't exist.
 | ||||
|  | @ -154,7 +155,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { | |||
| 	// the size here and write a zero-length file to blobPath if this is the
 | ||||
| 	// case. For the most part, this should only ever happen with zero-length
 | ||||
| 	// tars.
 | ||||
| 	if _, err := luc.driver.Stat(luc.path); err != nil { | ||||
| 	if _, err := lw.driver.Stat(lw.path); err != nil { | ||||
| 		switch err := err.(type) { | ||||
| 		case storagedriver.PathNotFoundError: | ||||
| 			// HACK(stevvooe): This is slightly dangerous: if we verify above,
 | ||||
|  | @ -163,24 +164,24 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { | |||
| 			// prevent this horrid thing, we employ the hack of only allowing
 | ||||
| 			// to this happen for the zero tarsum.
 | ||||
| 			if dgst == digest.DigestSha256EmptyTar { | ||||
| 				return luc.driver.PutContent(blobPath, []byte{}) | ||||
| 				return lw.driver.PutContent(blobPath, []byte{}) | ||||
| 			} | ||||
| 
 | ||||
| 			// We let this fail during the move below.
 | ||||
| 			logrus. | ||||
| 				WithField("upload.uuid", luc.UUID()). | ||||
| 				WithField("upload.uuid", lw.UUID()). | ||||
| 				WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest") | ||||
| 		default: | ||||
| 			return err // unrelated error
 | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return luc.driver.Move(luc.path, blobPath) | ||||
| 	return lw.driver.Move(lw.path, blobPath) | ||||
| } | ||||
| 
 | ||||
| // linkLayer links a valid, written layer blob into the registry under the
 | ||||
| // named repository for the upload controller.
 | ||||
| func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error { | ||||
| func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Digest) error { | ||||
| 	dgsts := append([]digest.Digest{canonical}, aliases...) | ||||
| 
 | ||||
| 	// Don't make duplicate links.
 | ||||
|  | @ -192,8 +193,8 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ... | |||
| 		} | ||||
| 		seenDigests[dgst] = struct{}{} | ||||
| 
 | ||||
| 		layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{ | ||||
| 			name:   luc.layerStore.repository.Name(), | ||||
| 		layerLinkPath, err := lw.layerStore.repository.registry.pm.path(layerLinkPathSpec{ | ||||
| 			name:   lw.layerStore.repository.Name(), | ||||
| 			digest: dgst, | ||||
| 		}) | ||||
| 
 | ||||
|  | @ -201,7 +202,7 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ... | |||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		if err := luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { | ||||
| 		if err := lw.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | @ -212,10 +213,10 @@ func (luc *layerUploadController) linkLayer(canonical digest.Digest, aliases ... | |||
| // removeResources should clean up all resources associated with the upload
 | ||||
| // instance. An error will be returned if the clean up cannot proceed. If the
 | ||||
| // resources are already not present, no error will be returned.
 | ||||
| func (luc *layerUploadController) removeResources() error { | ||||
| 	dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{ | ||||
| 		name: luc.layerStore.repository.Name(), | ||||
| 		uuid: luc.uuid, | ||||
| func (lw *layerWriter) removeResources() error { | ||||
| 	dataPath, err := lw.layerStore.repository.registry.pm.path(uploadDataPathSpec{ | ||||
| 		name: lw.layerStore.repository.Name(), | ||||
| 		uuid: lw.uuid, | ||||
| 	}) | ||||
| 
 | ||||
| 	if err != nil { | ||||
|  | @ -226,7 +227,7 @@ func (luc *layerUploadController) removeResources() error { | |||
| 	// upload related files.
 | ||||
| 	dirPath := path.Dir(dataPath) | ||||
| 
 | ||||
| 	if err := luc.driver.Delete(dirPath); err != nil { | ||||
| 	if err := lw.driver.Delete(dirPath); err != nil { | ||||
| 		switch err := err.(type) { | ||||
| 		case storagedriver.PathNotFoundError: | ||||
| 			break // already gone!
 | ||||
		Loading…
	
		Reference in New Issue