Split layer and upload from repository
Layer upload moved to its own file with its own unit tests Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)master
							parent
							
								
									b1ba2183ee
								
							
						
					
					
						commit
						6f9fbf99a9
					
				|  | @ -1,7 +1,6 @@ | |||
| package client | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
|  | @ -104,9 +103,8 @@ func parseHTTPErrorResponse(response *http.Response) error { | |||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	decoder := json.NewDecoder(bytes.NewReader(body)) | ||||
| 	err = decoder.Decode(&errors) | ||||
| 	if err != nil { | ||||
| 
 | ||||
| 	if err := json.Unmarshal(body, &errors); err != nil { | ||||
| 		return &UnexpectedHTTPResponseError{ | ||||
| 			ParseErr: err, | ||||
| 			Response: body, | ||||
|  |  | |||
|  | @ -0,0 +1,178 @@ | |||
| package client | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/docker/distribution/context" | ||||
| 	"github.com/docker/distribution/digest" | ||||
| ) | ||||
| 
 | ||||
| type httpLayer struct { | ||||
| 	*layers | ||||
| 
 | ||||
| 	size      int64 | ||||
| 	digest    digest.Digest | ||||
| 	createdAt time.Time | ||||
| 
 | ||||
| 	rc     io.ReadCloser // remote read closer
 | ||||
| 	brd    *bufio.Reader // internal buffered io
 | ||||
| 	offset int64 | ||||
| 	err    error | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) CreatedAt() time.Time { | ||||
| 	return hl.createdAt | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Digest() digest.Digest { | ||||
| 	return hl.digest | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Read(p []byte) (n int, err error) { | ||||
| 	if hl.err != nil { | ||||
| 		return 0, hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	rd, err := hl.reader() | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	n, err = rd.Read(p) | ||||
| 	hl.offset += int64(n) | ||||
| 
 | ||||
| 	// Simulate io.EOR error if we reach filesize.
 | ||||
| 	if err == nil && hl.offset >= hl.size { | ||||
| 		err = io.EOF | ||||
| 	} | ||||
| 
 | ||||
| 	return n, err | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Seek(offset int64, whence int) (int64, error) { | ||||
| 	if hl.err != nil { | ||||
| 		return 0, hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	var err error | ||||
| 	newOffset := hl.offset | ||||
| 
 | ||||
| 	switch whence { | ||||
| 	case os.SEEK_CUR: | ||||
| 		newOffset += int64(offset) | ||||
| 	case os.SEEK_END: | ||||
| 		newOffset = hl.size + int64(offset) | ||||
| 	case os.SEEK_SET: | ||||
| 		newOffset = int64(offset) | ||||
| 	} | ||||
| 
 | ||||
| 	if newOffset < 0 { | ||||
| 		err = fmt.Errorf("cannot seek to negative position") | ||||
| 	} else { | ||||
| 		if hl.offset != newOffset { | ||||
| 			hl.reset() | ||||
| 		} | ||||
| 
 | ||||
| 		// No problems, set the offset.
 | ||||
| 		hl.offset = newOffset | ||||
| 	} | ||||
| 
 | ||||
| 	return hl.offset, err | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Close() error { | ||||
| 	if hl.err != nil { | ||||
| 		return hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	// close and release reader chain
 | ||||
| 	if hl.rc != nil { | ||||
| 		hl.rc.Close() | ||||
| 	} | ||||
| 
 | ||||
| 	hl.rc = nil | ||||
| 	hl.brd = nil | ||||
| 
 | ||||
| 	hl.err = fmt.Errorf("httpLayer: closed") | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) reset() { | ||||
| 	if hl.err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if hl.rc != nil { | ||||
| 		hl.rc.Close() | ||||
| 		hl.rc = nil | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) reader() (io.Reader, error) { | ||||
| 	if hl.err != nil { | ||||
| 		return nil, hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	if hl.rc != nil { | ||||
| 		return hl.brd, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// If the offset is great than or equal to size, return a empty, noop reader.
 | ||||
| 	if hl.offset >= hl.size { | ||||
| 		return ioutil.NopCloser(bytes.NewReader([]byte{})), nil | ||||
| 	} | ||||
| 
 | ||||
| 	blobURL, err := hl.ub.BuildBlobURL(hl.name, hl.digest) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	req, err := http.NewRequest("GET", blobURL, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if hl.offset > 0 { | ||||
| 		// TODO(stevvooe): Get this working correctly.
 | ||||
| 
 | ||||
| 		// If we are at different offset, issue a range request from there.
 | ||||
| 		req.Header.Add("Range", fmt.Sprintf("1-")) | ||||
| 		context.GetLogger(hl.context).Infof("Range: %s", req.Header.Get("Range")) | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err := hl.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	switch { | ||||
| 	case resp.StatusCode == 200: | ||||
| 		hl.rc = resp.Body | ||||
| 	default: | ||||
| 		defer resp.Body.Close() | ||||
| 		return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	if hl.brd == nil { | ||||
| 		hl.brd = bufio.NewReader(hl.rc) | ||||
| 	} else { | ||||
| 		hl.brd.Reset(hl.rc) | ||||
| 	} | ||||
| 
 | ||||
| 	return hl.brd, nil | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Length() int64 { | ||||
| 	return hl.size | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Handler(r *http.Request) (http.Handler, error) { | ||||
| 	panic("Not implemented") | ||||
| } | ||||
|  | @ -0,0 +1,164 @@ | |||
| package client | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	"github.com/docker/distribution/digest" | ||||
| ) | ||||
| 
 | ||||
| type httpLayerUpload struct { | ||||
| 	repo   distribution.Repository | ||||
| 	client *http.Client | ||||
| 
 | ||||
| 	uuid      string | ||||
| 	startedAt time.Time | ||||
| 
 | ||||
| 	location string // always the last value of the location header.
 | ||||
| 	offset   int64 | ||||
| 	closed   bool | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) handleErrorResponse(resp *http.Response) error { | ||||
| 	switch { | ||||
| 	case resp.StatusCode == http.StatusNotFound: | ||||
| 		return &BlobUploadNotFoundError{Location: hlu.location} | ||||
| 	case resp.StatusCode >= 400 && resp.StatusCode < 500: | ||||
| 		return parseHTTPErrorResponse(resp) | ||||
| 	default: | ||||
| 		return &UnexpectedHTTPStatusError{Status: resp.Status} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) ReadFrom(r io.Reader) (n int64, err error) { | ||||
| 	req, err := http.NewRequest("PATCH", hlu.location, r) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	defer req.Body.Close() | ||||
| 
 | ||||
| 	resp, err := hlu.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.StatusCode != http.StatusAccepted { | ||||
| 		return 0, hlu.handleErrorResponse(resp) | ||||
| 	} | ||||
| 
 | ||||
| 	// TODO(dmcgowan): Validate headers
 | ||||
| 	hlu.uuid = resp.Header.Get("Docker-Upload-UUID") | ||||
| 	hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	rng := resp.Header.Get("Range") | ||||
| 	var start, end int64 | ||||
| 	if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil { | ||||
| 		return 0, err | ||||
| 	} else if n != 2 || end < start { | ||||
| 		return 0, fmt.Errorf("bad range format: %s", rng) | ||||
| 	} | ||||
| 
 | ||||
| 	return (end - start + 1), nil | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Write(p []byte) (n int, err error) { | ||||
| 	req, err := http.NewRequest("PATCH", hlu.location, bytes.NewReader(p)) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hlu.offset, hlu.offset+int64(len(p)-1))) | ||||
| 	req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p))) | ||||
| 	req.Header.Set("Content-Type", "application/octet-stream") | ||||
| 
 | ||||
| 	resp, err := hlu.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.StatusCode != http.StatusAccepted { | ||||
| 		return 0, hlu.handleErrorResponse(resp) | ||||
| 	} | ||||
| 
 | ||||
| 	// TODO(dmcgowan): Validate headers
 | ||||
| 	hlu.uuid = resp.Header.Get("Docker-Upload-UUID") | ||||
| 	hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	rng := resp.Header.Get("Range") | ||||
| 	var start, end int | ||||
| 	if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil { | ||||
| 		return 0, err | ||||
| 	} else if n != 2 || end < start { | ||||
| 		return 0, fmt.Errorf("bad range format: %s", rng) | ||||
| 	} | ||||
| 
 | ||||
| 	return (end - start + 1), nil | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Seek(offset int64, whence int) (int64, error) { | ||||
| 	newOffset := hlu.offset | ||||
| 
 | ||||
| 	switch whence { | ||||
| 	case os.SEEK_CUR: | ||||
| 		newOffset += int64(offset) | ||||
| 	case os.SEEK_END: | ||||
| 		return newOffset, errors.New("Cannot seek from end on incomplete upload") | ||||
| 	case os.SEEK_SET: | ||||
| 		newOffset = int64(offset) | ||||
| 	} | ||||
| 
 | ||||
| 	hlu.offset = newOffset | ||||
| 
 | ||||
| 	return hlu.offset, nil | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) UUID() string { | ||||
| 	return hlu.uuid | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) StartedAt() time.Time { | ||||
| 	return hlu.startedAt | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Finish(digest digest.Digest) (distribution.Layer, error) { | ||||
| 	// TODO(dmcgowan): Check if already finished, if so just fetch
 | ||||
| 	req, err := http.NewRequest("PUT", hlu.location, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	values := req.URL.Query() | ||||
| 	values.Set("digest", digest.String()) | ||||
| 	req.URL.RawQuery = values.Encode() | ||||
| 
 | ||||
| 	resp, err := hlu.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if resp.StatusCode != http.StatusCreated { | ||||
| 		return nil, hlu.handleErrorResponse(resp) | ||||
| 	} | ||||
| 
 | ||||
| 	return hlu.repo.Layers().Fetch(digest) | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Cancel() error { | ||||
| 	panic("not implemented") | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Close() error { | ||||
| 	hlu.closed = true | ||||
| 	return nil | ||||
| } | ||||
|  | @ -0,0 +1,223 @@ | |||
| package client | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/docker/distribution" | ||||
| 	"github.com/docker/distribution/registry/api/v2" | ||||
| 	"github.com/docker/distribution/testutil" | ||||
| ) | ||||
| 
 | ||||
| // Test implements distribution.LayerUpload
 | ||||
| var _ distribution.LayerUpload = &httpLayerUpload{} | ||||
| 
 | ||||
| func TestUploadReadFrom(t *testing.T) { | ||||
| 	_, b := newRandomBlob(64) | ||||
| 	repo := "test/upload/readfrom" | ||||
| 	locationPath := fmt.Sprintf("/v2/%s/uploads/testid", repo) | ||||
| 
 | ||||
| 	m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{ | ||||
| 		{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "GET", | ||||
| 				Route:  "/v2/", | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusOK, | ||||
| 				Headers: http.Header(map[string][]string{ | ||||
| 					"Docker-Distribution-API-Version": {"registry/2.0"}, | ||||
| 				}), | ||||
| 			}, | ||||
| 		}, | ||||
| 		// Test Valid case
 | ||||
| 		{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "PATCH", | ||||
| 				Route:  locationPath, | ||||
| 				Body:   b, | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusAccepted, | ||||
| 				Headers: http.Header(map[string][]string{ | ||||
| 					"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"}, | ||||
| 					"Location":           {locationPath}, | ||||
| 					"Range":              {"0-63"}, | ||||
| 				}), | ||||
| 			}, | ||||
| 		}, | ||||
| 		// Test invalid range
 | ||||
| 		{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "PATCH", | ||||
| 				Route:  locationPath, | ||||
| 				Body:   b, | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusAccepted, | ||||
| 				Headers: http.Header(map[string][]string{ | ||||
| 					"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"}, | ||||
| 					"Location":           {locationPath}, | ||||
| 					"Range":              {""}, | ||||
| 				}), | ||||
| 			}, | ||||
| 		}, | ||||
| 		// Test 404
 | ||||
| 		{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "PATCH", | ||||
| 				Route:  locationPath, | ||||
| 				Body:   b, | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusNotFound, | ||||
| 			}, | ||||
| 		}, | ||||
| 		// Test 400 valid json
 | ||||
| 		{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "PATCH", | ||||
| 				Route:  locationPath, | ||||
| 				Body:   b, | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusBadRequest, | ||||
| 				Body: []byte(` | ||||
| 				{ | ||||
| 					"errors": [ | ||||
| 						{ | ||||
| 							"code": "BLOB_UPLOAD_INVALID", | ||||
| 							"message": "invalid upload identifier", | ||||
| 							"detail": "more detail" | ||||
| 						} | ||||
| 					] | ||||
| 				}`), | ||||
| 			}, | ||||
| 		}, | ||||
| 		// Test 400 invalid json
 | ||||
| 		{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "PATCH", | ||||
| 				Route:  locationPath, | ||||
| 				Body:   b, | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusBadRequest, | ||||
| 				Body:       []byte("something bad happened"), | ||||
| 			}, | ||||
| 		}, | ||||
| 		// Test 500
 | ||||
| 		{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "PATCH", | ||||
| 				Route:  locationPath, | ||||
| 				Body:   b, | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusInternalServerError, | ||||
| 			}, | ||||
| 		}, | ||||
| 	}) | ||||
| 
 | ||||
| 	e, c := testServer(m) | ||||
| 	defer c() | ||||
| 
 | ||||
| 	client, err := e.HTTPClient(repo) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating client: %s", err) | ||||
| 	} | ||||
| 	layerUpload := &httpLayerUpload{ | ||||
| 		client: client, | ||||
| 	} | ||||
| 
 | ||||
| 	// Valid case
 | ||||
| 	layerUpload.location = e.Endpoint + locationPath | ||||
| 	n, err := layerUpload.ReadFrom(bytes.NewReader(b)) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error calling ReadFrom: %s", err) | ||||
| 	} | ||||
| 	if n != 64 { | ||||
| 		t.Fatalf("Wrong length returned from ReadFrom: %d, expected 64", n) | ||||
| 	} | ||||
| 
 | ||||
| 	// Bad range
 | ||||
| 	layerUpload.location = e.Endpoint + locationPath | ||||
| 	_, err = layerUpload.ReadFrom(bytes.NewReader(b)) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("Expected error when bad range received") | ||||
| 	} | ||||
| 
 | ||||
| 	// 404
 | ||||
| 	layerUpload.location = e.Endpoint + locationPath | ||||
| 	_, err = layerUpload.ReadFrom(bytes.NewReader(b)) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("Expected error when not found") | ||||
| 	} | ||||
| 	if blobErr, ok := err.(*BlobUploadNotFoundError); !ok { | ||||
| 		t.Fatalf("Wrong error type %T: %s", err, err) | ||||
| 	} else if expected := e.Endpoint + locationPath; blobErr.Location != expected { | ||||
| 		t.Fatalf("Unexpected location: %s, expected %s", blobErr.Location, expected) | ||||
| 	} | ||||
| 
 | ||||
| 	// 400 valid json
 | ||||
| 	layerUpload.location = e.Endpoint + locationPath | ||||
| 	_, err = layerUpload.ReadFrom(bytes.NewReader(b)) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("Expected error when not found") | ||||
| 	} | ||||
| 	if uploadErr, ok := err.(*v2.Errors); !ok { | ||||
| 		t.Fatalf("Wrong error type %T: %s", err, err) | ||||
| 	} else if len(uploadErr.Errors) != 1 { | ||||
| 		t.Fatalf("Unexpected number of errors: %d, expected 1", len(uploadErr.Errors)) | ||||
| 	} else { | ||||
| 		v2Err := uploadErr.Errors[0] | ||||
| 		if v2Err.Code != v2.ErrorCodeBlobUploadInvalid { | ||||
| 			t.Fatalf("Unexpected error code: %s, expected %s", v2Err.Code.String(), v2.ErrorCodeBlobUploadInvalid.String()) | ||||
| 		} | ||||
| 		if expected := "invalid upload identifier"; v2Err.Message != expected { | ||||
| 			t.Fatalf("Unexpected error message: %s, expected %s", v2Err.Message, expected) | ||||
| 		} | ||||
| 		if expected := "more detail"; v2Err.Detail.(string) != expected { | ||||
| 			t.Fatalf("Unexpected error message: %s, expected %s", v2Err.Detail.(string), expected) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// 400 invalid json
 | ||||
| 	layerUpload.location = e.Endpoint + locationPath | ||||
| 	_, err = layerUpload.ReadFrom(bytes.NewReader(b)) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("Expected error when not found") | ||||
| 	} | ||||
| 	if uploadErr, ok := err.(*UnexpectedHTTPResponseError); !ok { | ||||
| 		t.Fatalf("Wrong error type %T: %s", err, err) | ||||
| 	} else { | ||||
| 		respStr := string(uploadErr.Response) | ||||
| 		if expected := "something bad happened"; respStr != expected { | ||||
| 			t.Fatalf("Unexpected response string: %s, expected: %s", respStr, expected) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// 500
 | ||||
| 	layerUpload.location = e.Endpoint + locationPath | ||||
| 	_, err = layerUpload.ReadFrom(bytes.NewReader(b)) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("Expected error when not found") | ||||
| 	} | ||||
| 	if uploadErr, ok := err.(*UnexpectedHTTPStatusError); !ok { | ||||
| 		t.Fatalf("Wrong error type %T: %s", err, err) | ||||
| 	} else if expected := "500 " + http.StatusText(http.StatusInternalServerError); uploadErr.Status != expected { | ||||
| 		t.Fatalf("Unexpected response status: %s, expected %s", uploadErr.Status, expected) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| //repo   distribution.Repository
 | ||||
| //client *http.Client
 | ||||
| 
 | ||||
| //uuid      string
 | ||||
| //startedAt time.Time
 | ||||
| 
 | ||||
| //location string // always the last value of the location header.
 | ||||
| //offset   int64
 | ||||
| //closed   bool
 | ||||
|  | @ -1,21 +1,14 @@ | |||
| package client | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"os" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
| 
 | ||||
| 	ctxu "github.com/docker/distribution/context" | ||||
| 
 | ||||
| 	"github.com/docker/distribution/manifest" | ||||
| 
 | ||||
| 	"github.com/docker/distribution/digest" | ||||
|  | @ -276,7 +269,8 @@ func (ls *layers) Upload() (distribution.LayerUpload, error) { | |||
| 		} | ||||
| 
 | ||||
| 		return &httpLayerUpload{ | ||||
| 			layers:    ls, | ||||
| 			repo:      ls.repository, | ||||
| 			client:    ls.client, | ||||
| 			uuid:      uuid, | ||||
| 			startedAt: time.Now(), | ||||
| 			location:  location, | ||||
|  | @ -339,319 +333,3 @@ func (ls *layers) fetchLayer(dgst digest.Digest) (distribution.Layer, error) { | |||
| 		return nil, &UnexpectedHTTPStatusError{Status: resp.Status} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| type httpLayer struct { | ||||
| 	*layers | ||||
| 
 | ||||
| 	size      int64 | ||||
| 	digest    digest.Digest | ||||
| 	createdAt time.Time | ||||
| 
 | ||||
| 	rc     io.ReadCloser // remote read closer
 | ||||
| 	brd    *bufio.Reader // internal buffered io
 | ||||
| 	offset int64 | ||||
| 	err    error | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) CreatedAt() time.Time { | ||||
| 	return hl.createdAt | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Digest() digest.Digest { | ||||
| 	return hl.digest | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Read(p []byte) (n int, err error) { | ||||
| 	if hl.err != nil { | ||||
| 		return 0, hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	rd, err := hl.reader() | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	n, err = rd.Read(p) | ||||
| 	hl.offset += int64(n) | ||||
| 
 | ||||
| 	// Simulate io.EOR error if we reach filesize.
 | ||||
| 	if err == nil && hl.offset >= hl.size { | ||||
| 		err = io.EOF | ||||
| 	} | ||||
| 
 | ||||
| 	return n, err | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Seek(offset int64, whence int) (int64, error) { | ||||
| 	if hl.err != nil { | ||||
| 		return 0, hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	var err error | ||||
| 	newOffset := hl.offset | ||||
| 
 | ||||
| 	switch whence { | ||||
| 	case os.SEEK_CUR: | ||||
| 		newOffset += int64(offset) | ||||
| 	case os.SEEK_END: | ||||
| 		newOffset = hl.size + int64(offset) | ||||
| 	case os.SEEK_SET: | ||||
| 		newOffset = int64(offset) | ||||
| 	} | ||||
| 
 | ||||
| 	if newOffset < 0 { | ||||
| 		err = fmt.Errorf("cannot seek to negative position") | ||||
| 	} else { | ||||
| 		if hl.offset != newOffset { | ||||
| 			hl.reset() | ||||
| 		} | ||||
| 
 | ||||
| 		// No problems, set the offset.
 | ||||
| 		hl.offset = newOffset | ||||
| 	} | ||||
| 
 | ||||
| 	return hl.offset, err | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Close() error { | ||||
| 	if hl.err != nil { | ||||
| 		return hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	// close and release reader chain
 | ||||
| 	if hl.rc != nil { | ||||
| 		hl.rc.Close() | ||||
| 	} | ||||
| 
 | ||||
| 	hl.rc = nil | ||||
| 	hl.brd = nil | ||||
| 
 | ||||
| 	hl.err = fmt.Errorf("httpLayer: closed") | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) reset() { | ||||
| 	if hl.err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if hl.rc != nil { | ||||
| 		hl.rc.Close() | ||||
| 		hl.rc = nil | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) reader() (io.Reader, error) { | ||||
| 	if hl.err != nil { | ||||
| 		return nil, hl.err | ||||
| 	} | ||||
| 
 | ||||
| 	if hl.rc != nil { | ||||
| 		return hl.brd, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// If the offset is great than or equal to size, return a empty, noop reader.
 | ||||
| 	if hl.offset >= hl.size { | ||||
| 		return ioutil.NopCloser(bytes.NewReader([]byte{})), nil | ||||
| 	} | ||||
| 
 | ||||
| 	blobURL, err := hl.ub.BuildBlobURL(hl.name, hl.digest) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	req, err := http.NewRequest("GET", blobURL, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if hl.offset > 0 { | ||||
| 		// TODO(stevvooe): Get this working correctly.
 | ||||
| 
 | ||||
| 		// If we are at different offset, issue a range request from there.
 | ||||
| 		req.Header.Add("Range", fmt.Sprintf("1-")) | ||||
| 		ctxu.GetLogger(hl.context).Infof("Range: %s", req.Header.Get("Range")) | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err := hl.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	switch { | ||||
| 	case resp.StatusCode == 200: | ||||
| 		hl.rc = resp.Body | ||||
| 	default: | ||||
| 		defer resp.Body.Close() | ||||
| 		return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) | ||||
| 	} | ||||
| 
 | ||||
| 	if hl.brd == nil { | ||||
| 		hl.brd = bufio.NewReader(hl.rc) | ||||
| 	} else { | ||||
| 		hl.brd.Reset(hl.rc) | ||||
| 	} | ||||
| 
 | ||||
| 	return hl.brd, nil | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Length() int64 { | ||||
| 	return hl.size | ||||
| } | ||||
| 
 | ||||
| func (hl *httpLayer) Handler(r *http.Request) (http.Handler, error) { | ||||
| 	panic("Not implemented") | ||||
| } | ||||
| 
 | ||||
| type httpLayerUpload struct { | ||||
| 	*layers | ||||
| 
 | ||||
| 	uuid      string | ||||
| 	startedAt time.Time | ||||
| 
 | ||||
| 	location string // always the last value of the location header.
 | ||||
| 	offset   int64 | ||||
| 	closed   bool | ||||
| } | ||||
| 
 | ||||
| var _ distribution.LayerUpload = &httpLayerUpload{} | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) ReadFrom(r io.Reader) (n int64, err error) { | ||||
| 	req, err := http.NewRequest("PATCH", hlu.location, r) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	defer req.Body.Close() | ||||
| 
 | ||||
| 	resp, err := hlu.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	switch { | ||||
| 	case resp.StatusCode == http.StatusAccepted: | ||||
| 		// TODO(dmcgowan): Validate headers
 | ||||
| 		hlu.uuid = resp.Header.Get("Docker-Upload-UUID") | ||||
| 		hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		rng := resp.Header.Get("Range") | ||||
| 		var start, end int64 | ||||
| 		if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil { | ||||
| 			return 0, err | ||||
| 		} else if n != 2 || end < start { | ||||
| 			return 0, fmt.Errorf("bad range format: %s", rng) | ||||
| 		} | ||||
| 
 | ||||
| 		return (end - start + 1), nil | ||||
| 	case resp.StatusCode == http.StatusNotFound: | ||||
| 		return 0, &BlobUploadNotFoundError{Location: hlu.location} | ||||
| 	case resp.StatusCode >= 400 && resp.StatusCode < 500: | ||||
| 		return 0, parseHTTPErrorResponse(resp) | ||||
| 	default: | ||||
| 		return 0, &UnexpectedHTTPStatusError{Status: resp.Status} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Write(p []byte) (n int, err error) { | ||||
| 	req, err := http.NewRequest("PATCH", hlu.location, bytes.NewReader(p)) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hlu.offset, hlu.offset+int64(len(p)-1))) | ||||
| 	req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p))) | ||||
| 	req.Header.Set("Content-Type", "application/octet-stream") | ||||
| 
 | ||||
| 	resp, err := hlu.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	switch { | ||||
| 	case resp.StatusCode == http.StatusAccepted: | ||||
| 		// TODO(dmcgowan): Validate headers
 | ||||
| 		hlu.uuid = resp.Header.Get("Docker-Upload-UUID") | ||||
| 		hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		rng := resp.Header.Get("Range") | ||||
| 		var start, end int | ||||
| 		if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil { | ||||
| 			return 0, err | ||||
| 		} else if n != 2 || end < start { | ||||
| 			return 0, fmt.Errorf("bad range format: %s", rng) | ||||
| 		} | ||||
| 
 | ||||
| 		return (end - start + 1), nil | ||||
| 	case resp.StatusCode == http.StatusNotFound: | ||||
| 		return 0, &BlobUploadNotFoundError{Location: hlu.location} | ||||
| 	case resp.StatusCode >= 400 && resp.StatusCode < 500: | ||||
| 		return 0, parseHTTPErrorResponse(resp) | ||||
| 	default: | ||||
| 		return 0, &UnexpectedHTTPStatusError{Status: resp.Status} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Seek(offset int64, whence int) (int64, error) { | ||||
| 	newOffset := hlu.offset | ||||
| 
 | ||||
| 	switch whence { | ||||
| 	case os.SEEK_CUR: | ||||
| 		newOffset += int64(offset) | ||||
| 	case os.SEEK_END: | ||||
| 		return newOffset, errors.New("Cannot seek from end on incomplete upload") | ||||
| 	case os.SEEK_SET: | ||||
| 		newOffset = int64(offset) | ||||
| 	} | ||||
| 
 | ||||
| 	hlu.offset = newOffset | ||||
| 
 | ||||
| 	return hlu.offset, nil | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) UUID() string { | ||||
| 	return hlu.uuid | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) StartedAt() time.Time { | ||||
| 	return hlu.startedAt | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Finish(digest digest.Digest) (distribution.Layer, error) { | ||||
| 	// TODO(dmcgowan): Check if already finished, if so just fetch
 | ||||
| 	req, err := http.NewRequest("PUT", hlu.location, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	values := req.URL.Query() | ||||
| 	values.Set("digest", digest.String()) | ||||
| 	req.URL.RawQuery = values.Encode() | ||||
| 
 | ||||
| 	resp, err := hlu.client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	switch { | ||||
| 	case resp.StatusCode == http.StatusCreated: | ||||
| 		return hlu.Layers().Fetch(digest) | ||||
| 	case resp.StatusCode == http.StatusNotFound: | ||||
| 		return nil, &BlobUploadNotFoundError{Location: hlu.location} | ||||
| 	case resp.StatusCode >= 400 && resp.StatusCode < 500: | ||||
| 		return nil, parseHTTPErrorResponse(resp) | ||||
| 	default: | ||||
| 		return nil, &UnexpectedHTTPStatusError{Status: resp.Status} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Cancel() error { | ||||
| 	panic("not implemented") | ||||
| } | ||||
| 
 | ||||
| func (hlu *httpLayerUpload) Close() error { | ||||
| 	hlu.closed = true | ||||
| 	return nil | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue