commit
						b60ee6dfa0
					
				|  | @ -0,0 +1,236 @@ | |||
| package registry | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| 	"net/http/httputil" | ||||
| 	"net/url" | ||||
| 	"os" | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	_ "github.com/docker/docker-registry/storagedriver/inmemory" | ||||
| 
 | ||||
| 	"github.com/gorilla/handlers" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/common/testutil" | ||||
| 	"github.com/docker/docker-registry/configuration" | ||||
| 	"github.com/docker/docker-registry/digest" | ||||
| ) | ||||
| 
 | ||||
| // TestLayerAPI conducts a full of the of the layer api.
 | ||||
| func TestLayerAPI(t *testing.T) { | ||||
| 	// TODO(stevvooe): This test code is complete junk but it should cover the
 | ||||
| 	// complete flow. This must be broken down and checked against the
 | ||||
| 	// specification *before* we submit the final to docker core.
 | ||||
| 
 | ||||
| 	config := configuration.Configuration{ | ||||
| 		Storage: configuration.Storage{ | ||||
| 			"inmemory": configuration.Parameters{}, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	app := NewApp(config) | ||||
| 	server := httptest.NewServer(handlers.CombinedLoggingHandler(os.Stderr, app)) | ||||
| 	router := v2APIRouter() | ||||
| 
 | ||||
| 	u, err := url.Parse(server.URL) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error parsing server url: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	imageName := "foo/bar" | ||||
| 	// "build" our layer file
 | ||||
| 	layerFile, tarSumStr, err := testutil.CreateRandomTarFile() | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error creating random layer file: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	layerDigest := digest.Digest(tarSumStr) | ||||
| 
 | ||||
| 	// -----------------------------------
 | ||||
| 	// Test fetch for non-existent content
 | ||||
| 	r, err := router.GetRoute(routeNameBlob).Host(u.Host). | ||||
| 		URL("name", imageName, | ||||
| 		"digest", tarSumStr) | ||||
| 
 | ||||
| 	resp, err := http.Get(r.String()) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error fetching non-existent layer: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	switch resp.StatusCode { | ||||
| 	case http.StatusNotFound: | ||||
| 		break // expected
 | ||||
| 	default: | ||||
| 		d, err := httputil.DumpResponse(resp, true) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 		} | ||||
| 
 | ||||
| 		t.Logf("response:\n%s", string(d)) | ||||
| 		t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	// ------------------------------------------
 | ||||
| 	// Test head request for non-existent content
 | ||||
| 	resp, err = http.Head(r.String()) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error checking head on non-existent layer: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	switch resp.StatusCode { | ||||
| 	case http.StatusNotFound: | ||||
| 		break // expected
 | ||||
| 	default: | ||||
| 		d, err := httputil.DumpResponse(resp, true) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 		} | ||||
| 
 | ||||
| 		t.Logf("response:\n%s", string(d)) | ||||
| 		t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	// ------------------------------------------
 | ||||
| 	// Upload a layer
 | ||||
| 	r, err = router.GetRoute(routeNameBlobUpload).Host(u.Host). | ||||
| 		URL("name", imageName) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error starting layer upload: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err = http.Post(r.String(), "", nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error starting layer upload: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.StatusCode != http.StatusAccepted { | ||||
| 		d, err := httputil.DumpResponse(resp, true) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) | ||||
| 		} | ||||
| 
 | ||||
| 		t.Logf("response:\n%s", string(d)) | ||||
| 		t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.Header.Get("Location") == "" { // TODO(stevvooe): Need better check here.
 | ||||
| 		t.Fatalf("unexpected Location: %q != %q", resp.Header.Get("Location"), "foo") | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.Header.Get("Content-Length") != "0" { | ||||
| 		t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") | ||||
| 	} | ||||
| 
 | ||||
| 	layerLength, _ := layerFile.Seek(0, os.SEEK_END) | ||||
| 	layerFile.Seek(0, os.SEEK_SET) | ||||
| 
 | ||||
| 	uploadURLStr := resp.Header.Get("Location") | ||||
| 
 | ||||
| 	// TODO(sday): Cancel the layer upload here and restart.
 | ||||
| 
 | ||||
| 	query := url.Values{ | ||||
| 		"digest": []string{layerDigest.String()}, | ||||
| 		"length": []string{fmt.Sprint(layerLength)}, | ||||
| 	} | ||||
| 
 | ||||
| 	uploadURL, err := url.Parse(uploadURLStr) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error parsing url: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	uploadURL.RawQuery = query.Encode() | ||||
| 
 | ||||
| 	// Just do a monolithic upload
 | ||||
| 	req, err := http.NewRequest("PUT", uploadURL.String(), layerFile) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error creating new request: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err = http.DefaultClient.Do(req) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error doing put: %v", err) | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
| 
 | ||||
| 	switch resp.StatusCode { | ||||
| 	case http.StatusCreated: | ||||
| 		break // expected
 | ||||
| 	default: | ||||
| 		d, err := httputil.DumpResponse(resp, true) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) | ||||
| 		} | ||||
| 
 | ||||
| 		t.Logf("response:\n%s", string(d)) | ||||
| 		t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.Header.Get("Location") == "" { | ||||
| 		t.Fatalf("unexpected Location: %q", resp.Header.Get("Location")) | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.Header.Get("Content-Length") != "0" { | ||||
| 		t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") | ||||
| 	} | ||||
| 
 | ||||
| 	layerURL := resp.Header.Get("Location") | ||||
| 
 | ||||
| 	// ------------------------
 | ||||
| 	// Use a head request to see if the layer exists.
 | ||||
| 	resp, err = http.Head(layerURL) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error checking head on non-existent layer: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	switch resp.StatusCode { | ||||
| 	case http.StatusOK: | ||||
| 		break // expected
 | ||||
| 	default: | ||||
| 		d, err := httputil.DumpResponse(resp, true) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 		} | ||||
| 
 | ||||
| 		t.Logf("response:\n%s", string(d)) | ||||
| 		t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	logrus.Infof("fetch the layer") | ||||
| 	// ----------------
 | ||||
| 	// Fetch the layer!
 | ||||
| 	resp, err = http.Get(layerURL) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error fetching layer: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	switch resp.StatusCode { | ||||
| 	case http.StatusOK: | ||||
| 		break // expected
 | ||||
| 	default: | ||||
| 		d, err := httputil.DumpResponse(resp, true) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 		} | ||||
| 
 | ||||
| 		t.Logf("response:\n%s", string(d)) | ||||
| 		t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	// Verify the body
 | ||||
| 	verifier := digest.NewDigestVerifier(layerDigest) | ||||
| 	io.Copy(verifier, resp.Body) | ||||
| 
 | ||||
| 	if !verifier.Verified() { | ||||
| 		d, err := httputil.DumpResponse(resp, true) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected status checking head on layer ayo!: %v, %v", resp.StatusCode, resp.Status) | ||||
| 		} | ||||
| 
 | ||||
| 		t.Logf("response:\n%s", string(d)) | ||||
| 		t.Fatalf("response body did not pass verification") | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										49
									
								
								app.go
								
								
								
								
							
							
						
						
									
										49
									
								
								app.go
								
								
								
								
							|  | @ -3,7 +3,11 @@ package registry | |||
| import ( | ||||
| 	"net/http" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/storagedriver" | ||||
| 	"github.com/docker/docker-registry/storagedriver/factory" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/configuration" | ||||
| 	"github.com/docker/docker-registry/storage" | ||||
| 
 | ||||
| 	log "github.com/Sirupsen/logrus" | ||||
| 	"github.com/gorilla/mux" | ||||
|  | @ -16,6 +20,12 @@ type App struct { | |||
| 	Config configuration.Configuration | ||||
| 
 | ||||
| 	router *mux.Router | ||||
| 
 | ||||
| 	// driver maintains the app global storage driver instance.
 | ||||
| 	driver storagedriver.StorageDriver | ||||
| 
 | ||||
| 	// services contains the main services instance for the application.
 | ||||
| 	services *storage.Services | ||||
| } | ||||
| 
 | ||||
| // NewApp takes a configuration and returns a configured app, ready to serve
 | ||||
|  | @ -29,11 +39,23 @@ func NewApp(configuration configuration.Configuration) *App { | |||
| 
 | ||||
| 	// Register the handler dispatchers.
 | ||||
| 	app.register(routeNameImageManifest, imageManifestDispatcher) | ||||
| 	app.register(routeNameBlob, layerDispatcher) | ||||
| 	app.register(routeNameTags, tagsDispatcher) | ||||
| 	app.register(routeNameBlob, layerDispatcher) | ||||
| 	app.register(routeNameBlobUpload, layerUploadDispatcher) | ||||
| 	app.register(routeNameBlobUploadResume, layerUploadDispatcher) | ||||
| 
 | ||||
| 	driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		// TODO(stevvooe): Move the creation of a service into a protected
 | ||||
| 		// method, where this is created lazily. Its status can be queried via
 | ||||
| 		// a health check.
 | ||||
| 		panic(err) | ||||
| 	} | ||||
| 
 | ||||
| 	app.driver = driver | ||||
| 	app.services = storage.NewServices(app.driver) | ||||
| 
 | ||||
| 	return app | ||||
| } | ||||
| 
 | ||||
|  | @ -64,6 +86,22 @@ type dispatchFunc func(ctx *Context, r *http.Request) http.Handler | |||
| // TODO(stevvooe): dispatchers should probably have some validation error
 | ||||
| // chain with proper error reporting.
 | ||||
| 
 | ||||
| // singleStatusResponseWriter only allows the first status to be written to be
 | ||||
| // the valid request status. The current use case of this class should be
 | ||||
| // factored out.
 | ||||
| type singleStatusResponseWriter struct { | ||||
| 	http.ResponseWriter | ||||
| 	status int | ||||
| } | ||||
| 
 | ||||
| func (ssrw *singleStatusResponseWriter) WriteHeader(status int) { | ||||
| 	if ssrw.status != 0 { | ||||
| 		return | ||||
| 	} | ||||
| 	ssrw.status = status | ||||
| 	ssrw.ResponseWriter.WriteHeader(status) | ||||
| } | ||||
| 
 | ||||
| // dispatcher returns a handler that constructs a request specific context and
 | ||||
| // handler, using the dispatch factory function.
 | ||||
| func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { | ||||
|  | @ -80,14 +118,17 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { | |||
| 		context.log = log.WithField("name", context.Name) | ||||
| 		handler := dispatch(context, r) | ||||
| 
 | ||||
| 		ssrw := &singleStatusResponseWriter{ResponseWriter: w} | ||||
| 		context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) | ||||
| 		handler.ServeHTTP(w, r) | ||||
| 		handler.ServeHTTP(ssrw, r) | ||||
| 
 | ||||
| 		// Automated error response handling here. Handlers may return their
 | ||||
| 		// own errors if they need different behavior (such as range errors
 | ||||
| 		// for layer upload).
 | ||||
| 		if len(context.Errors.Errors) > 0 { | ||||
| 			w.WriteHeader(http.StatusBadRequest) | ||||
| 		if context.Errors.Len() > 0 { | ||||
| 			if ssrw.status == 0 { | ||||
| 				w.WriteHeader(http.StatusBadRequest) | ||||
| 			} | ||||
| 			serveJSON(w, context.Errors) | ||||
| 		} | ||||
| 	}) | ||||
|  |  | |||
|  | @ -1,8 +1,6 @@ | |||
| package registry | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| ) | ||||
| import "github.com/Sirupsen/logrus" | ||||
| 
 | ||||
| // Context should contain the request specific context for use in across
 | ||||
| // handlers. Resources that don't need to be shared across handlers should not
 | ||||
|  | @ -20,11 +18,6 @@ type Context struct { | |||
| 	// handler *must not* start the response via http.ResponseWriter.
 | ||||
| 	Errors Errors | ||||
| 
 | ||||
| 	// TODO(stevvooe): Context would be a good place to create a
 | ||||
| 	// representation of the "authorized resource". Perhaps, rather than
 | ||||
| 	// having fields like "name", the context should be a set of parameters
 | ||||
| 	// then we do routing from there.
 | ||||
| 
 | ||||
| 	// vars contains the extracted gorilla/mux variables that can be used for
 | ||||
| 	// assignment.
 | ||||
| 	vars map[string]string | ||||
|  |  | |||
							
								
								
									
										43
									
								
								errors.go
								
								
								
								
							
							
						
						
									
										43
									
								
								errors.go
								
								
								
								
							|  | @ -17,20 +17,14 @@ const ( | |||
| 
 | ||||
| 	// The following errors can happen during a layer upload.
 | ||||
| 
 | ||||
| 	// ErrorCodeInvalidChecksum is returned when uploading a layer if the
 | ||||
| 	// provided checksum does not match the layer contents.
 | ||||
| 	ErrorCodeInvalidChecksum | ||||
| 	// ErrorCodeInvalidDigest is returned when uploading a layer if the
 | ||||
| 	// provided digest does not match the layer contents.
 | ||||
| 	ErrorCodeInvalidDigest | ||||
| 
 | ||||
| 	// ErrorCodeInvalidLength is returned when uploading a layer if the provided
 | ||||
| 	// length does not match the content length.
 | ||||
| 	ErrorCodeInvalidLength | ||||
| 
 | ||||
| 	// ErrorCodeInvalidTarsum is returned when the provided tarsum does not
 | ||||
| 	// match the computed tarsum of the contents.
 | ||||
| 	ErrorCodeInvalidTarsum | ||||
| 
 | ||||
| 	// The following errors can happen during manifest upload.
 | ||||
| 
 | ||||
| 	// ErrorCodeInvalidName is returned when the name in the manifest does not
 | ||||
| 	// match the provided name.
 | ||||
| 	ErrorCodeInvalidName | ||||
|  | @ -47,6 +41,9 @@ const ( | |||
| 	// nonexistent layer.
 | ||||
| 	ErrorCodeUnknownLayer | ||||
| 
 | ||||
| 	// ErrorCodeUnknownLayerUpload is returned when an upload is accessed.
 | ||||
| 	ErrorCodeUnknownLayerUpload | ||||
| 
 | ||||
| 	// ErrorCodeUntrustedSignature is returned when the manifest is signed by an
 | ||||
| 	// untrusted source.
 | ||||
| 	ErrorCodeUntrustedSignature | ||||
|  | @ -54,25 +51,25 @@ const ( | |||
| 
 | ||||
| var errorCodeStrings = map[ErrorCode]string{ | ||||
| 	ErrorCodeUnknown:            "UNKNOWN", | ||||
| 	ErrorCodeInvalidChecksum:    "INVALID_CHECKSUM", | ||||
| 	ErrorCodeInvalidDigest:      "INVALID_DIGEST", | ||||
| 	ErrorCodeInvalidLength:      "INVALID_LENGTH", | ||||
| 	ErrorCodeInvalidTarsum:      "INVALID_TARSUM", | ||||
| 	ErrorCodeInvalidName:        "INVALID_NAME", | ||||
| 	ErrorCodeInvalidTag:         "INVALID_TAG", | ||||
| 	ErrorCodeUnverifiedManifest: "UNVERIFIED_MANIFEST", | ||||
| 	ErrorCodeUnknownLayer:       "UNKNOWN_LAYER", | ||||
| 	ErrorCodeUnknownLayerUpload: "UNKNOWN_LAYER_UPLOAD", | ||||
| 	ErrorCodeUntrustedSignature: "UNTRUSTED_SIGNATURE", | ||||
| } | ||||
| 
 | ||||
| var errorCodesMessages = map[ErrorCode]string{ | ||||
| 	ErrorCodeUnknown:            "unknown error", | ||||
| 	ErrorCodeInvalidChecksum:    "provided checksum did not match uploaded content", | ||||
| 	ErrorCodeInvalidDigest:      "provided digest did not match uploaded content", | ||||
| 	ErrorCodeInvalidLength:      "provided length did not match content length", | ||||
| 	ErrorCodeInvalidTarsum:      "provided tarsum did not match binary content", | ||||
| 	ErrorCodeInvalidName:        "Manifest name did not match URI", | ||||
| 	ErrorCodeInvalidTag:         "Manifest tag did not match URI", | ||||
| 	ErrorCodeUnverifiedManifest: "Manifest failed signature validation", | ||||
| 	ErrorCodeUnknownLayer:       "Referenced layer not available", | ||||
| 	ErrorCodeUnknownLayerUpload: "cannot resume unknown layer upload", | ||||
| 	ErrorCodeUntrustedSignature: "Manifest signed by untrusted source", | ||||
| } | ||||
| 
 | ||||
|  | @ -136,7 +133,7 @@ func (ec *ErrorCode) UnmarshalText(text []byte) error { | |||
| 
 | ||||
| // Error provides a wrapper around ErrorCode with extra Details provided.
 | ||||
| type Error struct { | ||||
| 	Code    ErrorCode   `json:"code,omitempty"` | ||||
| 	Code    ErrorCode   `json:"code"` | ||||
| 	Message string      `json:"message,omitempty"` | ||||
| 	Detail  interface{} `json:"detail,omitempty"` | ||||
| } | ||||
|  | @ -144,7 +141,7 @@ type Error struct { | |||
| // Error returns a human readable representation of the error.
 | ||||
| func (e Error) Error() string { | ||||
| 	return fmt.Sprintf("%s: %s", | ||||
| 		strings.Title(strings.Replace(e.Code.String(), "_", " ", -1)), | ||||
| 		strings.ToLower(strings.Replace(e.Code.String(), "_", " ", -1)), | ||||
| 		e.Message) | ||||
| } | ||||
| 
 | ||||
|  | @ -167,6 +164,10 @@ func (errs *Errors) Push(code ErrorCode, details ...interface{}) { | |||
| 		detail = details[0] | ||||
| 	} | ||||
| 
 | ||||
| 	if err, ok := detail.(error); ok { | ||||
| 		detail = err.Error() | ||||
| 	} | ||||
| 
 | ||||
| 	errs.PushErr(Error{ | ||||
| 		Code:    code, | ||||
| 		Message: code.Message(), | ||||
|  | @ -180,7 +181,7 @@ func (errs *Errors) PushErr(err error) { | |||
| } | ||||
| 
 | ||||
| func (errs *Errors) Error() string { | ||||
| 	switch len(errs.Errors) { | ||||
| 	switch errs.Len() { | ||||
| 	case 0: | ||||
| 		return "<nil>" | ||||
| 	case 1: | ||||
|  | @ -194,6 +195,16 @@ func (errs *Errors) Error() string { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Clear clears the errors.
 | ||||
| func (errs *Errors) Clear() { | ||||
| 	errs.Errors = errs.Errors[:0] | ||||
| } | ||||
| 
 | ||||
| // Len returns the current number of errors.
 | ||||
| func (errs *Errors) Len() int { | ||||
| 	return len(errs.Errors) | ||||
| } | ||||
| 
 | ||||
| // DetailUnknownLayer provides detail for unknown layer errors, returned by
 | ||||
| // image manifest push for layers that are not yet transferred. This intended
 | ||||
| // to only be used on the backend to return detail for this specific error.
 | ||||
|  |  | |||
|  | @ -56,7 +56,7 @@ func TestErrorCodes(t *testing.T) { | |||
| func TestErrorsManagement(t *testing.T) { | ||||
| 	var errs Errors | ||||
| 
 | ||||
| 	errs.Push(ErrorCodeInvalidChecksum) | ||||
| 	errs.Push(ErrorCodeInvalidDigest) | ||||
| 
 | ||||
| 	var detail DetailUnknownLayer | ||||
| 	detail.Unknown.BlobSum = "sometestblobsumdoesntmatter" | ||||
|  | @ -69,7 +69,20 @@ func TestErrorsManagement(t *testing.T) { | |||
| 		t.Fatalf("error marashaling errors: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	expectedJSON := "{\"errors\":[{\"code\":\"INVALID_CHECKSUM\",\"message\":\"provided checksum did not match uploaded content\"},{\"code\":\"UNKNOWN_LAYER\",\"message\":\"Referenced layer not available\",\"detail\":{\"unknown\":{\"blobSum\":\"sometestblobsumdoesntmatter\"}}}]}" | ||||
| 	expectedJSON := "{\"errors\":[{\"code\":\"INVALID_DIGEST\",\"message\":\"provided digest did not match uploaded content\"},{\"code\":\"UNKNOWN_LAYER\",\"message\":\"Referenced layer not available\",\"detail\":{\"unknown\":{\"blobSum\":\"sometestblobsumdoesntmatter\"}}}]}" | ||||
| 
 | ||||
| 	if string(p) != expectedJSON { | ||||
| 		t.Fatalf("unexpected json: %q != %q", string(p), expectedJSON) | ||||
| 	} | ||||
| 
 | ||||
| 	errs.Clear() | ||||
| 	errs.Push(ErrorCodeUnknown) | ||||
| 	expectedJSON = "{\"errors\":[{\"code\":\"UNKNOWN\",\"message\":\"unknown error\"}]}" | ||||
| 	p, err = json.Marshal(errs) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error marashaling errors: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if string(p) != expectedJSON { | ||||
| 		t.Fatalf("unexpected json: %q != %q", string(p), expectedJSON) | ||||
|  |  | |||
							
								
								
									
										21
									
								
								helpers.go
								
								
								
								
							
							
						
						
									
										21
									
								
								helpers.go
								
								
								
								
							|  | @ -2,7 +2,10 @@ package registry | |||
| 
 | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 
 | ||||
| 	"github.com/gorilla/mux" | ||||
| ) | ||||
| 
 | ||||
| // serveJSON marshals v and sets the content-type header to
 | ||||
|  | @ -18,3 +21,21 @@ func serveJSON(w http.ResponseWriter, v interface{}) error { | |||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // closeResources closes all the provided resources after running the target
 | ||||
| // handler.
 | ||||
| func closeResources(handler http.Handler, closers ...io.Closer) http.Handler { | ||||
| 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		for _, closer := range closers { | ||||
| 			defer closer.Close() | ||||
| 		} | ||||
| 		handler.ServeHTTP(w, r) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| // clondedRoute returns a clone of the named route from the router.
 | ||||
| func clonedRoute(router *mux.Router, name string) *mux.Route { | ||||
| 	route := new(mux.Route) | ||||
| 	*route = *router.GetRoute(name) // clone the route
 | ||||
| 	return route | ||||
| } | ||||
|  |  | |||
							
								
								
									
										54
									
								
								layer.go
								
								
								
								
							
							
						
						
									
										54
									
								
								layer.go
								
								
								
								
							|  | @ -3,17 +3,28 @@ package registry | |||
| import ( | ||||
| 	"net/http" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/digest" | ||||
| 	"github.com/docker/docker-registry/storage" | ||||
| 	"github.com/gorilla/handlers" | ||||
| 	"github.com/gorilla/mux" | ||||
| ) | ||||
| 
 | ||||
| // layerDispatcher uses the request context to build a layerHandler.
 | ||||
| func layerDispatcher(ctx *Context, r *http.Request) http.Handler { | ||||
| 	layerHandler := &layerHandler{ | ||||
| 		Context: ctx, | ||||
| 		TarSum:  ctx.vars["tarsum"], | ||||
| 	dgst, err := digest.ParseDigest(ctx.vars["digest"]) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 			ctx.Errors.Push(ErrorCodeInvalidDigest, err) | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	layerHandler.log = layerHandler.log.WithField("tarsum", layerHandler.TarSum) | ||||
| 	layerHandler := &layerHandler{ | ||||
| 		Context: ctx, | ||||
| 		Digest:  dgst, | ||||
| 	} | ||||
| 
 | ||||
| 	layerHandler.log = layerHandler.log.WithField("digest", dgst) | ||||
| 
 | ||||
| 	return handlers.MethodHandler{ | ||||
| 		"GET":  http.HandlerFunc(layerHandler.GetLayer), | ||||
|  | @ -25,11 +36,44 @@ func layerDispatcher(ctx *Context, r *http.Request) http.Handler { | |||
| type layerHandler struct { | ||||
| 	*Context | ||||
| 
 | ||||
| 	TarSum string | ||||
| 	Digest digest.Digest | ||||
| } | ||||
| 
 | ||||
| // GetLayer fetches the binary data from backend storage returns it in the
 | ||||
| // response.
 | ||||
| func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { | ||||
| 	layers := lh.services.Layers() | ||||
| 
 | ||||
| 	layer, err := layers.Fetch(lh.Name, lh.Digest) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		switch err { | ||||
| 		case storage.ErrLayerUnknown: | ||||
| 			w.WriteHeader(http.StatusNotFound) | ||||
| 			lh.Errors.Push(ErrorCodeUnknownLayer, | ||||
| 				map[string]interface{}{ | ||||
| 					"unknown": FSLayer{BlobSum: lh.Digest}, | ||||
| 				}) | ||||
| 			return | ||||
| 		default: | ||||
| 			lh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	defer layer.Close() | ||||
| 
 | ||||
| 	http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer) | ||||
| } | ||||
| 
 | ||||
| func buildLayerURL(router *mux.Router, r *http.Request, layer storage.Layer) (string, error) { | ||||
| 	route := clonedRoute(router, routeNameBlob) | ||||
| 
 | ||||
| 	layerURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). | ||||
| 		URL("name", layer.Name(), | ||||
| 		"digest", layer.Digest().String()) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return layerURL.String(), nil | ||||
| } | ||||
|  |  | |||
							
								
								
									
										191
									
								
								layerupload.go
								
								
								
								
							
							
						
						
									
										191
									
								
								layerupload.go
								
								
								
								
							|  | @ -1,64 +1,225 @@ | |||
| package registry | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"strconv" | ||||
| 
 | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	"github.com/docker/docker-registry/digest" | ||||
| 	"github.com/docker/docker-registry/storage" | ||||
| 	"github.com/gorilla/handlers" | ||||
| 	"github.com/gorilla/mux" | ||||
| ) | ||||
| 
 | ||||
| // layerUploadDispatcher constructs and returns the layer upload handler for
 | ||||
| // the given request context.
 | ||||
| func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler { | ||||
| 	layerUploadHandler := &layerUploadHandler{ | ||||
| 	luh := &layerUploadHandler{ | ||||
| 		Context: ctx, | ||||
| 		TarSum:  ctx.vars["tarsum"], | ||||
| 		UUID:    ctx.vars["uuid"], | ||||
| 	} | ||||
| 
 | ||||
| 	layerUploadHandler.log = layerUploadHandler.log.WithField("tarsum", layerUploadHandler.TarSum) | ||||
| 	handler := http.Handler(handlers.MethodHandler{ | ||||
| 		"POST":   http.HandlerFunc(luh.StartLayerUpload), | ||||
| 		"GET":    http.HandlerFunc(luh.GetUploadStatus), | ||||
| 		"HEAD":   http.HandlerFunc(luh.GetUploadStatus), | ||||
| 		"PUT":    http.HandlerFunc(luh.PutLayerChunk), | ||||
| 		"DELETE": http.HandlerFunc(luh.CancelLayerUpload), | ||||
| 	}) | ||||
| 
 | ||||
| 	if layerUploadHandler.UUID != "" { | ||||
| 		layerUploadHandler.log = layerUploadHandler.log.WithField("uuid", layerUploadHandler.UUID) | ||||
| 	if luh.UUID != "" { | ||||
| 		luh.log = luh.log.WithField("uuid", luh.UUID) | ||||
| 
 | ||||
| 		layers := ctx.services.Layers() | ||||
| 		upload, err := layers.Resume(luh.UUID) | ||||
| 
 | ||||
| 		if err != nil && err != storage.ErrLayerUploadUnknown { | ||||
| 			return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 				logrus.Infof("error resolving upload: %v", err) | ||||
| 				w.WriteHeader(http.StatusInternalServerError) | ||||
| 				luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 			}) | ||||
| 		} | ||||
| 
 | ||||
| 		luh.Upload = upload | ||||
| 		handler = closeResources(handler, luh.Upload) | ||||
| 	} | ||||
| 
 | ||||
| 	return handlers.MethodHandler{ | ||||
| 		"POST":   http.HandlerFunc(layerUploadHandler.StartLayerUpload), | ||||
| 		"GET":    http.HandlerFunc(layerUploadHandler.GetUploadStatus), | ||||
| 		"HEAD":   http.HandlerFunc(layerUploadHandler.GetUploadStatus), | ||||
| 		"PUT":    http.HandlerFunc(layerUploadHandler.PutLayerChunk), | ||||
| 		"DELETE": http.HandlerFunc(layerUploadHandler.CancelLayerUpload), | ||||
| 	} | ||||
| 	return handler | ||||
| } | ||||
| 
 | ||||
| // layerUploadHandler handles the http layer upload process.
 | ||||
| type layerUploadHandler struct { | ||||
| 	*Context | ||||
| 
 | ||||
| 	// TarSum is the unique identifier of the layer being uploaded.
 | ||||
| 	TarSum string | ||||
| 
 | ||||
| 	// UUID identifies the upload instance for the current request.
 | ||||
| 	UUID string | ||||
| 
 | ||||
| 	Upload storage.LayerUpload | ||||
| } | ||||
| 
 | ||||
| // StartLayerUpload begins the layer upload process and allocates a server-
 | ||||
| // side upload session.
 | ||||
| func (luh *layerUploadHandler) StartLayerUpload(w http.ResponseWriter, r *http.Request) { | ||||
| 	layers := luh.services.Layers() | ||||
| 	upload, err := layers.Upload(luh.Name) | ||||
| 	if err != nil { | ||||
| 		w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
 | ||||
| 		luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	luh.Upload = upload | ||||
| 	defer luh.Upload.Close() | ||||
| 
 | ||||
| 	if err := luh.layerUploadResponse(w, r); err != nil { | ||||
| 		w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
 | ||||
| 		luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 		return | ||||
| 	} | ||||
| 	w.WriteHeader(http.StatusAccepted) | ||||
| } | ||||
| 
 | ||||
| // GetUploadStatus returns the status of a given upload, identified by uuid.
 | ||||
| func (luh *layerUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Request) { | ||||
| 	if luh.Upload == nil { | ||||
| 		w.WriteHeader(http.StatusNotFound) | ||||
| 		luh.Errors.Push(ErrorCodeUnknownLayerUpload) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := luh.layerUploadResponse(w, r); err != nil { | ||||
| 		w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
 | ||||
| 		luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	w.WriteHeader(http.StatusNoContent) | ||||
| } | ||||
| 
 | ||||
| // PutLayerChunk receives a layer chunk during the layer upload process,
 | ||||
| // possible completing the upload with a checksum and length.
 | ||||
| func (luh *layerUploadHandler) PutLayerChunk(w http.ResponseWriter, r *http.Request) { | ||||
| 	if luh.Upload == nil { | ||||
| 		w.WriteHeader(http.StatusNotFound) | ||||
| 		luh.Errors.Push(ErrorCodeUnknownLayerUpload) | ||||
| 	} | ||||
| 
 | ||||
| 	var finished bool | ||||
| 
 | ||||
| 	// TODO(stevvooe): This is woefully incomplete. Missing stuff:
 | ||||
| 	//
 | ||||
| 	// 1. Extract information from range header, if present.
 | ||||
| 	// 2. Check offset of current layer.
 | ||||
| 	// 3. Emit correct error responses.
 | ||||
| 
 | ||||
| 	// Read in the chunk
 | ||||
| 	io.Copy(luh.Upload, r.Body) | ||||
| 
 | ||||
| 	if err := luh.maybeCompleteUpload(w, r); err != nil { | ||||
| 		if err != errNotReadyToComplete { | ||||
| 			w.WriteHeader(http.StatusInternalServerError) | ||||
| 			luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if err := luh.layerUploadResponse(w, r); err != nil { | ||||
| 		w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
 | ||||
| 		luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if finished { | ||||
| 		w.WriteHeader(http.StatusCreated) | ||||
| 	} else { | ||||
| 		w.WriteHeader(http.StatusAccepted) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // CancelLayerUpload cancels an in-progress upload of a layer.
 | ||||
| func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.Request) { | ||||
| 	if luh.Upload == nil { | ||||
| 		w.WriteHeader(http.StatusNotFound) | ||||
| 		luh.Errors.Push(ErrorCodeUnknownLayerUpload) | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| // layerUploadResponse provides a standard request for uploading layers and
 | ||||
| // chunk responses. This sets the correct headers but the response status is
 | ||||
| // left to the caller.
 | ||||
| func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error { | ||||
| 	uploadURL, err := buildLayerUploadURL(luh.router, r, luh.Upload) | ||||
| 	if err != nil { | ||||
| 		logrus.Infof("error building upload url: %s", err) | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	w.Header().Set("Location", uploadURL) | ||||
| 	w.Header().Set("Content-Length", "0") | ||||
| 	w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset())) | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| var errNotReadyToComplete = fmt.Errorf("not ready to complete upload") | ||||
| 
 | ||||
| // maybeCompleteUpload tries to complete the upload if the correct parameters
 | ||||
| // are available. Returns errNotReadyToComplete if not ready to complete.
 | ||||
| func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error { | ||||
| 	// If we get a digest and length, we can finish the upload.
 | ||||
| 	dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters!
 | ||||
| 	sizeStr := r.FormValue("length") | ||||
| 
 | ||||
| 	if dgstStr == "" || sizeStr == "" { | ||||
| 		return errNotReadyToComplete | ||||
| 	} | ||||
| 
 | ||||
| 	dgst, err := digest.ParseDigest(dgstStr) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	size, err := strconv.ParseInt(sizeStr, 10, 64) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	luh.completeUpload(w, r, size, dgst) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // completeUpload finishes out the upload with the correct response.
 | ||||
| func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) { | ||||
| 	layer, err := luh.Upload.Finish(size, dgst) | ||||
| 	if err != nil { | ||||
| 		luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 		w.WriteHeader(http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	layerURL, err := buildLayerURL(luh.router, r, layer) | ||||
| 	if err != nil { | ||||
| 		luh.Errors.Push(ErrorCodeUnknown, err) | ||||
| 		w.WriteHeader(http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	w.Header().Set("Location", layerURL) | ||||
| 	w.Header().Set("Content-Length", "0") | ||||
| 	w.WriteHeader(http.StatusCreated) | ||||
| } | ||||
| 
 | ||||
| func buildLayerUploadURL(router *mux.Router, r *http.Request, upload storage.LayerUpload) (string, error) { | ||||
| 	route := clonedRoute(router, routeNameBlobUploadResume) | ||||
| 
 | ||||
| 	uploadURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). | ||||
| 		URL("name", upload.Name(), "uuid", upload.UUID()) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return uploadURL.String(), nil | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,163 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/storagedriver" | ||||
| ) | ||||
| 
 | ||||
| // remoteFileReader provides a read seeker interface to files stored in
 | ||||
| // storagedriver. Used to implement part of layer interface and will be used
 | ||||
| // to implement read side of LayerUpload.
 | ||||
| type fileReader struct { | ||||
| 	driver storagedriver.StorageDriver | ||||
| 
 | ||||
| 	// identifying fields
 | ||||
| 	path string | ||||
| 	size int64 // size is the total layer size, must be set.
 | ||||
| 
 | ||||
| 	// mutable fields
 | ||||
| 	rc     io.ReadCloser // remote read closer
 | ||||
| 	brd    *bufio.Reader // internal buffered io
 | ||||
| 	offset int64         // offset is the current read offset
 | ||||
| 	err    error         // terminal error, if set, reader is closed
 | ||||
| } | ||||
| 
 | ||||
| func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { | ||||
| 	// Grab the size of the layer file, ensuring existence.
 | ||||
| 	size, err := driver.CurrentSize(path) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return &fileReader{ | ||||
| 		driver: driver, | ||||
| 		path:   path, | ||||
| 		size:   int64(size), | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| func (fr *fileReader) Read(p []byte) (n int, err error) { | ||||
| 	if fr.err != nil { | ||||
| 		return 0, fr.err | ||||
| 	} | ||||
| 
 | ||||
| 	rd, err := fr.reader() | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	n, err = rd.Read(p) | ||||
| 	fr.offset += int64(n) | ||||
| 
 | ||||
| 	// Simulate io.EOR error if we reach filesize.
 | ||||
| 	if err == nil && fr.offset >= fr.size { | ||||
| 		err = io.EOF | ||||
| 	} | ||||
| 
 | ||||
| 	return n, err | ||||
| } | ||||
| 
 | ||||
| func (fr *fileReader) Seek(offset int64, whence int) (int64, error) { | ||||
| 	if fr.err != nil { | ||||
| 		return 0, fr.err | ||||
| 	} | ||||
| 
 | ||||
| 	var err error | ||||
| 	newOffset := fr.offset | ||||
| 
 | ||||
| 	switch whence { | ||||
| 	case os.SEEK_CUR: | ||||
| 		newOffset += int64(offset) | ||||
| 	case os.SEEK_END: | ||||
| 		newOffset = fr.size + int64(offset) | ||||
| 	case os.SEEK_SET: | ||||
| 		newOffset = int64(offset) | ||||
| 	} | ||||
| 
 | ||||
| 	if newOffset < 0 { | ||||
| 		err = fmt.Errorf("cannot seek to negative position") | ||||
| 	} else if newOffset > fr.size { | ||||
| 		err = fmt.Errorf("cannot seek passed end of file") | ||||
| 	} else { | ||||
| 		if fr.offset != newOffset { | ||||
| 			fr.reset() | ||||
| 		} | ||||
| 
 | ||||
| 		// No problems, set the offset.
 | ||||
| 		fr.offset = newOffset | ||||
| 	} | ||||
| 
 | ||||
| 	return fr.offset, err | ||||
| } | ||||
| 
 | ||||
| // Close the layer. Should be called when the resource is no longer needed.
 | ||||
| func (fr *fileReader) Close() error { | ||||
| 	if fr.err != nil { | ||||
| 		return fr.err | ||||
| 	} | ||||
| 
 | ||||
| 	fr.err = ErrLayerClosed | ||||
| 
 | ||||
| 	// close and release reader chain
 | ||||
| 	if fr.rc != nil { | ||||
| 		fr.rc.Close() | ||||
| 	} | ||||
| 
 | ||||
| 	fr.rc = nil | ||||
| 	fr.brd = nil | ||||
| 
 | ||||
| 	return fr.err | ||||
| } | ||||
| 
 | ||||
| // reader prepares the current reader at the lrs offset, ensuring its buffered
 | ||||
| // and ready to go.
 | ||||
| func (fr *fileReader) reader() (io.Reader, error) { | ||||
| 	if fr.err != nil { | ||||
| 		return nil, fr.err | ||||
| 	} | ||||
| 
 | ||||
| 	if fr.rc != nil { | ||||
| 		return fr.brd, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// If we don't have a reader, open one up.
 | ||||
| 	rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset)) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	fr.rc = rc | ||||
| 
 | ||||
| 	if fr.brd == nil { | ||||
| 		// TODO(stevvooe): Set an optimal buffer size here. We'll have to
 | ||||
| 		// understand the latency characteristics of the underlying network to
 | ||||
| 		// set this correctly, so we may want to leave it to the driver. For
 | ||||
| 		// out of process drivers, we'll have to optimize this buffer size for
 | ||||
| 		// local communication.
 | ||||
| 		fr.brd = bufio.NewReader(fr.rc) | ||||
| 	} else { | ||||
| 		fr.brd.Reset(fr.rc) | ||||
| 	} | ||||
| 
 | ||||
| 	return fr.brd, nil | ||||
| } | ||||
| 
 | ||||
| // resetReader resets the reader, forcing the read method to open up a new
 | ||||
| // connection and rebuild the buffered reader. This should be called when the
 | ||||
| // offset and the reader will become out of sync, such as during a seek
 | ||||
| // operation.
 | ||||
| func (fr *fileReader) reset() { | ||||
| 	if fr.err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if fr.rc != nil { | ||||
| 		fr.rc.Close() | ||||
| 		fr.rc = nil | ||||
| 	} | ||||
| } | ||||
|  | @ -0,0 +1,158 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"crypto/rand" | ||||
| 	"io" | ||||
| 	mrand "math/rand" | ||||
| 	"os" | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/digest" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/storagedriver/inmemory" | ||||
| ) | ||||
| 
 | ||||
| func TestSimpleRead(t *testing.T) { | ||||
| 	content := make([]byte, 1<<20) | ||||
| 	n, err := rand.Read(content) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error building random data: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if n != len(content) { | ||||
| 		t.Fatalf("random read did't fill buffer") | ||||
| 	} | ||||
| 
 | ||||
| 	dgst, err := digest.FromReader(bytes.NewReader(content)) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error digesting random content: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	driver := inmemory.New() | ||||
| 	path := "/random" | ||||
| 
 | ||||
| 	if err := driver.PutContent(path, content); err != nil { | ||||
| 		t.Fatalf("error putting patterned content: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	fr, err := newFileReader(driver, path) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error allocating file reader: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	verifier := digest.NewDigestVerifier(dgst) | ||||
| 	io.Copy(verifier, fr) | ||||
| 
 | ||||
| 	if !verifier.Verified() { | ||||
| 		t.Fatalf("unable to verify read data") | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestFileReaderSeek(t *testing.T) { | ||||
| 	driver := inmemory.New() | ||||
| 	pattern := "01234567890ab" // prime length block
 | ||||
| 	repititions := 1024 | ||||
| 	path := "/patterned" | ||||
| 	content := bytes.Repeat([]byte(pattern), repititions) | ||||
| 
 | ||||
| 	if err := driver.PutContent(path, content); err != nil { | ||||
| 		t.Fatalf("error putting patterned content: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	fr, err := newFileReader(driver, path) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("unexpected error creating file reader: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Seek all over the place, in blocks of pattern size and make sure we get
 | ||||
| 	// the right data.
 | ||||
| 	for _, repitition := range mrand.Perm(repititions - 1) { | ||||
| 		targetOffset := int64(len(pattern) * repitition) | ||||
| 		// Seek to a multiple of pattern size and read pattern size bytes
 | ||||
| 		offset, err := fr.Seek(targetOffset, os.SEEK_SET) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected error seeking: %v", err) | ||||
| 		} | ||||
| 
 | ||||
| 		if offset != targetOffset { | ||||
| 			t.Fatalf("did not seek to correct offset: %d != %d", offset, targetOffset) | ||||
| 		} | ||||
| 
 | ||||
| 		p := make([]byte, len(pattern)) | ||||
| 
 | ||||
| 		n, err := fr.Read(p) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("error reading pattern: %v", err) | ||||
| 		} | ||||
| 
 | ||||
| 		if n != len(pattern) { | ||||
| 			t.Fatalf("incorrect read length: %d != %d", n, len(pattern)) | ||||
| 		} | ||||
| 
 | ||||
| 		if string(p) != pattern { | ||||
| 			t.Fatalf("incorrect read content: %q != %q", p, pattern) | ||||
| 		} | ||||
| 
 | ||||
| 		// Check offset
 | ||||
| 		current, err := fr.Seek(0, os.SEEK_CUR) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("error checking current offset: %v", err) | ||||
| 		} | ||||
| 
 | ||||
| 		if current != targetOffset+int64(len(pattern)) { | ||||
| 			t.Fatalf("unexpected offset after read: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	start, err := fr.Seek(0, os.SEEK_SET) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error seeking to start: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if start != 0 { | ||||
| 		t.Fatalf("expected to seek to start: %v != 0", start) | ||||
| 	} | ||||
| 
 | ||||
| 	end, err := fr.Seek(0, os.SEEK_END) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("error checking current offset: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if end != int64(len(content)) { | ||||
| 		t.Fatalf("expected to seek to end: %v != %v", end, len(content)) | ||||
| 	} | ||||
| 
 | ||||
| 	// 4. Seek past end and before start, ensure error.
 | ||||
| 
 | ||||
| 	// seek before start
 | ||||
| 	before, err := fr.Seek(-1, os.SEEK_SET) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("error expected, returned offset=%v", before) | ||||
| 	} | ||||
| 
 | ||||
| 	after, err := fr.Seek(int64(len(content)+1), os.SEEK_END) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("error expected, returned offset=%v", after) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // TestLayerReadErrors covers the various error return type for different
 | ||||
| // conditions that can arise when reading a layer.
 | ||||
| func TestFileReaderErrors(t *testing.T) { | ||||
| 	// TODO(stevvooe): We need to cover error return types, driven by the
 | ||||
| 	// errors returned via the HTTP API. For now, here is a incomplete list:
 | ||||
| 	//
 | ||||
| 	// 	1. Layer Not Found: returned when layer is not found or access is
 | ||||
| 	//        denied.
 | ||||
| 	//	2. Layer Unavailable: returned when link references are unresolved,
 | ||||
| 	//     but layer is known to the registry.
 | ||||
| 	//  3. Layer Invalid: This may more split into more errors, but should be
 | ||||
| 	//     returned when name or tarsum does not reference a valid error. We
 | ||||
| 	//     may also need something to communication layer verification errors
 | ||||
| 	//     for the inline tarsum check.
 | ||||
| 	//	4. Timeout: timeouts to backend. Need to better understand these
 | ||||
| 	//     failure cases and how the storage driver propagates these errors
 | ||||
| 	//     up the stack.
 | ||||
| } | ||||
|  | @ -87,4 +87,8 @@ var ( | |||
| 
 | ||||
| 	// ErrLayerInvalidLength returned when length check fails.
 | ||||
| 	ErrLayerInvalidLength = fmt.Errorf("invalid layer length") | ||||
| 
 | ||||
| 	// ErrLayerClosed returned when an operation is attempted on a closed
 | ||||
| 	// Layer or LayerUpload.
 | ||||
| 	ErrLayerClosed = fmt.Errorf("layer closed") | ||||
| ) | ||||
|  |  | |||
|  | @ -241,31 +241,6 @@ func TestSimpleLayerRead(t *testing.T) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestLayerReaderSeek(t *testing.T) { | ||||
| 	// TODO(stevvooe): Ensure that all relative seeks work as advertised.
 | ||||
| 	// Readers must close and re-open on command. This is important to support
 | ||||
| 	// resumable and concurrent downloads via HTTP range requests.
 | ||||
| } | ||||
| 
 | ||||
| // TestLayerReadErrors covers the various error return type for different
 | ||||
| // conditions that can arise when reading a layer.
 | ||||
| func TestLayerReadErrors(t *testing.T) { | ||||
| 	// TODO(stevvooe): We need to cover error return types, driven by the
 | ||||
| 	// errors returned via the HTTP API. For now, here is a incomplete list:
 | ||||
| 	//
 | ||||
| 	// 	1. Layer Not Found: returned when layer is not found or access is
 | ||||
| 	//        denied.
 | ||||
| 	//	2. Layer Unavailable: returned when link references are unresolved,
 | ||||
| 	//     but layer is known to the registry.
 | ||||
| 	//  3. Layer Invalid: This may more split into more errors, but should be
 | ||||
| 	//     returned when name or tarsum does not reference a valid error. We
 | ||||
| 	//     may also need something to communication layer verification errors
 | ||||
| 	//     for the inline tarsum check.
 | ||||
| 	//	4. Timeout: timeouts to backend. Need to better understand these
 | ||||
| 	//     failure cases and how the storage driver propagates these errors
 | ||||
| 	//     up the stack.
 | ||||
| } | ||||
| 
 | ||||
| // writeRandomLayer creates a random layer under name and tarSum using driver
 | ||||
| // and pathMapper. An io.ReadSeeker with the data is returned, along with the
 | ||||
| // sha256 hex digest.
 | ||||
|  |  | |||
|  | @ -1,10 +1,6 @@ | |||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry/digest" | ||||
|  | @ -13,22 +9,11 @@ import ( | |||
| // layerReadSeeker implements Layer and provides facilities for reading and
 | ||||
| // seeking.
 | ||||
| type layerReader struct { | ||||
| 	layerStore *layerStore | ||||
| 	rc         io.ReadCloser | ||||
| 	brd        *bufio.Reader | ||||
| 	fileReader | ||||
| 
 | ||||
| 	name      string // repo name of this layer
 | ||||
| 	digest    digest.Digest | ||||
| 	path      string | ||||
| 	createdAt time.Time | ||||
| 
 | ||||
| 	// offset is the current read offset
 | ||||
| 	offset int64 | ||||
| 
 | ||||
| 	// size is the total layer size, if available.
 | ||||
| 	size int64 | ||||
| 
 | ||||
| 	closedErr error // terminal error, if set, reader is closed
 | ||||
| } | ||||
| 
 | ||||
| var _ Layer = &layerReader{} | ||||
|  | @ -44,131 +29,3 @@ func (lrs *layerReader) Digest() digest.Digest { | |||
| func (lrs *layerReader) CreatedAt() time.Time { | ||||
| 	return lrs.createdAt | ||||
| } | ||||
| 
 | ||||
| func (lrs *layerReader) Read(p []byte) (n int, err error) { | ||||
| 	if err := lrs.closed(); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	rd, err := lrs.reader() | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	n, err = rd.Read(p) | ||||
| 	lrs.offset += int64(n) | ||||
| 
 | ||||
| 	// Simulate io.EOR error if we reach filesize.
 | ||||
| 	if err == nil && lrs.offset >= lrs.size { | ||||
| 		err = io.EOF | ||||
| 	} | ||||
| 
 | ||||
| 	// TODO(stevvooe): More error checking is required here. If the reader
 | ||||
| 	// times out for some reason, we should reset the reader so we re-open the
 | ||||
| 	// connection.
 | ||||
| 
 | ||||
| 	return n, err | ||||
| } | ||||
| 
 | ||||
| func (lrs *layerReader) Seek(offset int64, whence int) (int64, error) { | ||||
| 	if err := lrs.closed(); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	var err error | ||||
| 	newOffset := lrs.offset | ||||
| 
 | ||||
| 	switch whence { | ||||
| 	case os.SEEK_CUR: | ||||
| 		newOffset += int64(whence) | ||||
| 	case os.SEEK_END: | ||||
| 		newOffset = lrs.size + int64(whence) | ||||
| 	case os.SEEK_SET: | ||||
| 		newOffset = int64(whence) | ||||
| 	} | ||||
| 
 | ||||
| 	if newOffset < 0 { | ||||
| 		err = fmt.Errorf("cannot seek to negative position") | ||||
| 	} else if newOffset >= lrs.size { | ||||
| 		err = fmt.Errorf("cannot seek passed end of layer") | ||||
| 	} else { | ||||
| 		if lrs.offset != newOffset { | ||||
| 			lrs.resetReader() | ||||
| 		} | ||||
| 
 | ||||
| 		// No problems, set the offset.
 | ||||
| 		lrs.offset = newOffset | ||||
| 	} | ||||
| 
 | ||||
| 	return lrs.offset, err | ||||
| } | ||||
| 
 | ||||
| // Close the layer. Should be called when the resource is no longer needed.
 | ||||
| func (lrs *layerReader) Close() error { | ||||
| 	if lrs.closedErr != nil { | ||||
| 		return lrs.closedErr | ||||
| 	} | ||||
| 	// TODO(sday): Must export this error.
 | ||||
| 	lrs.closedErr = fmt.Errorf("layer closed") | ||||
| 
 | ||||
| 	// close and release reader chain
 | ||||
| 	if lrs.rc != nil { | ||||
| 		lrs.rc.Close() | ||||
| 		lrs.rc = nil | ||||
| 	} | ||||
| 	lrs.brd = nil | ||||
| 
 | ||||
| 	return lrs.closedErr | ||||
| } | ||||
| 
 | ||||
| // reader prepares the current reader at the lrs offset, ensuring its buffered
 | ||||
| // and ready to go.
 | ||||
| func (lrs *layerReader) reader() (io.Reader, error) { | ||||
| 	if err := lrs.closed(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if lrs.rc != nil { | ||||
| 		return lrs.brd, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// If we don't have a reader, open one up.
 | ||||
| 	rc, err := lrs.layerStore.driver.ReadStream(lrs.path, uint64(lrs.offset)) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	lrs.rc = rc | ||||
| 
 | ||||
| 	if lrs.brd == nil { | ||||
| 		// TODO(stevvooe): Set an optimal buffer size here. We'll have to
 | ||||
| 		// understand the latency characteristics of the underlying network to
 | ||||
| 		// set this correctly, so we may want to leave it to the driver. For
 | ||||
| 		// out of process drivers, we'll have to optimize this buffer size for
 | ||||
| 		// local communication.
 | ||||
| 		lrs.brd = bufio.NewReader(lrs.rc) | ||||
| 	} else { | ||||
| 		lrs.brd.Reset(lrs.rc) | ||||
| 	} | ||||
| 
 | ||||
| 	return lrs.brd, nil | ||||
| } | ||||
| 
 | ||||
| // resetReader resets the reader, forcing the read method to open up a new
 | ||||
| // connection and rebuild the buffered reader. This should be called when the
 | ||||
| // offset and the reader will become out of sync, such as during a seek
 | ||||
| // operation.
 | ||||
| func (lrs *layerReader) resetReader() { | ||||
| 	if err := lrs.closed(); err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if lrs.rc != nil { | ||||
| 		lrs.rc.Close() | ||||
| 		lrs.rc = nil | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (lrs *layerReader) closed() error { | ||||
| 	return lrs.closedErr | ||||
| } | ||||
|  |  | |||
|  | @ -57,33 +57,26 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) { | |||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	// Grab the size of the layer file, ensuring that it exists, among other
 | ||||
| 	// things.
 | ||||
| 	size, err := ls.driver.CurrentSize(p) | ||||
| 
 | ||||
| 	fr, err := newFileReader(ls.driver, p) | ||||
| 	if err != nil { | ||||
| 		// TODO(stevvooe): Handle blob/path does not exist here.
 | ||||
| 		// TODO(stevvooe): Get a better understanding of the error cases here
 | ||||
| 		// that don't stem from unknown path.
 | ||||
| 		return nil, err | ||||
| 		switch err := err.(type) { | ||||
| 		case storagedriver.PathNotFoundError, *storagedriver.PathNotFoundError: | ||||
| 			return nil, ErrLayerUnknown | ||||
| 		default: | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// Build the layer reader and return to the client.
 | ||||
| 	layer := &layerReader{ | ||||
| 		layerStore: ls, | ||||
| 		path:       p, | ||||
| 	return &layerReader{ | ||||
| 		fileReader: *fr, | ||||
| 		name:       name, | ||||
| 		digest:     digest, | ||||
| 
 | ||||
| 		// TODO(stevvooe): Storage backend does not support modification time
 | ||||
| 		// queries yet. Layers "never" change, so just return the zero value.
 | ||||
| 		createdAt: time.Time{}, | ||||
| 
 | ||||
| 		offset: 0, | ||||
| 		size:   int64(size), | ||||
| 	} | ||||
| 
 | ||||
| 	return layer, nil | ||||
| 		// queries yet. Layers "never" change, so just return the zero value
 | ||||
| 		// plus a nano-second.
 | ||||
| 		createdAt: (time.Time{}).Add(time.Nanosecond), | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| // Upload begins a layer upload, returning a handle. If the layer upload
 | ||||
|  |  | |||
|  | @ -429,6 +429,10 @@ func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) | |||
| 		return lus, err | ||||
| 	} | ||||
| 
 | ||||
| 	if err := llufs.SaveState(lus); err != nil { | ||||
| 		return lus, err | ||||
| 	} | ||||
| 
 | ||||
| 	return lus, nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -15,7 +15,6 @@ type Services struct { | |||
| // NewServices creates a new Services object to access docker objects stored
 | ||||
| // in the underlying driver.
 | ||||
| func NewServices(driver storagedriver.StorageDriver) *Services { | ||||
| 
 | ||||
| 	layerUploadStore, err := newTemporaryLocalFSLayerUploadStore() | ||||
| 
 | ||||
| 	if err != nil { | ||||
|  | @ -40,5 +39,5 @@ func NewServices(driver storagedriver.StorageDriver) *Services { | |||
| // may be context sensitive in the future. The instance should be used similar
 | ||||
| // to a request local.
 | ||||
| func (ss *Services) Layers() LayerService { | ||||
| 	return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper} | ||||
| 	return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore} | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue