Allows layers to be partially pulled and resumed
Adds a sort of contrived test for resumable pullsmaster
							parent
							
								
									73d6e8af84
								
							
						
					
					
						commit
						50d64ac63a
					
				|  | @ -117,6 +117,7 @@ func TestPush(t *testing.T) { | |||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 
 | ||||
| 		writer.SetSize(len(blob.contents)) | ||||
| 		writer.Write(blob.contents) | ||||
| 		writer.Close() | ||||
| 	} | ||||
|  | @ -235,3 +236,133 @@ func TestPull(t *testing.T) { | |||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestPullResume(t *testing.T) { | ||||
| 	name := "hello/world" | ||||
| 	tag := "sometag" | ||||
| 	testBlobs := []testBlob{ | ||||
| 		{ | ||||
| 			digest:   "12345", | ||||
| 			contents: []byte("some contents"), | ||||
| 		}, | ||||
| 		{ | ||||
| 			digest:   "98765", | ||||
| 			contents: []byte("some other contents"), | ||||
| 		}, | ||||
| 	} | ||||
| 	layers := make([]registry.FSLayer, len(testBlobs)) | ||||
| 	history := make([]registry.ManifestHistory, len(testBlobs)) | ||||
| 
 | ||||
| 	for i, layer := range testBlobs { | ||||
| 		layers[i] = registry.FSLayer{BlobSum: layer.digest} | ||||
| 		history[i] = registry.ManifestHistory{V1Compatibility: layer.digest.String()} | ||||
| 	} | ||||
| 
 | ||||
| 	manifest := ®istry.ImageManifest{ | ||||
| 		Name:          name, | ||||
| 		Tag:           tag, | ||||
| 		Architecture:  "x86", | ||||
| 		FSLayers:      layers, | ||||
| 		History:       history, | ||||
| 		SchemaVersion: 1, | ||||
| 	} | ||||
| 	manifestBytes, err := json.Marshal(manifest) | ||||
| 
 | ||||
| 	layerRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs)) | ||||
| 	for i, blob := range testBlobs { | ||||
| 		layerRequestResponseMappings[2*i] = testutil.RequestResponseMapping{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "GET", | ||||
| 				Route:  "/v2/" + name + "/blob/" + blob.digest.String(), | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusOK, | ||||
| 				Body:       blob.contents[:len(blob.contents)/2], | ||||
| 				Headers: http.Header(map[string][]string{ | ||||
| 					"Content-Length": {fmt.Sprint(len(blob.contents))}, | ||||
| 				}), | ||||
| 			}, | ||||
| 		} | ||||
| 		layerRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{ | ||||
| 			Request: testutil.Request{ | ||||
| 				Method: "GET", | ||||
| 				Route:  "/v2/" + name + "/blob/" + blob.digest.String(), | ||||
| 			}, | ||||
| 			Response: testutil.Response{ | ||||
| 				StatusCode: http.StatusOK, | ||||
| 				Body:       blob.contents[len(blob.contents)/2:], | ||||
| 			}, | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for i := 0; i < 3; i++ { | ||||
| 		layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMap{ | ||||
| 			testutil.RequestResponseMapping{ | ||||
| 				Request: testutil.Request{ | ||||
| 					Method: "GET", | ||||
| 					Route:  "/v2/" + name + "/manifest/" + tag, | ||||
| 				}, | ||||
| 				Response: testutil.Response{ | ||||
| 					StatusCode: http.StatusOK, | ||||
| 					Body:       manifestBytes, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}...) | ||||
| 	} | ||||
| 
 | ||||
| 	handler := testutil.NewHandler(layerRequestResponseMappings) | ||||
| 	server := httptest.NewServer(handler) | ||||
| 	client := New(server.URL) | ||||
| 	objectStore := &memoryObjectStore{ | ||||
| 		mutex:           new(sync.Mutex), | ||||
| 		manifestStorage: make(map[string]*registry.ImageManifest), | ||||
| 		layerStorage:    make(map[digest.Digest]Layer), | ||||
| 	} | ||||
| 
 | ||||
| 	for attempts := 0; attempts < 3; attempts++ { | ||||
| 		err = Pull(client, objectStore, name, tag) | ||||
| 		if err == nil { | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	m, err := objectStore.Manifest(name, tag) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	mBytes, err := json.Marshal(m) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	if string(mBytes) != string(manifestBytes) { | ||||
| 		t.Fatal("Incorrect manifest") | ||||
| 	} | ||||
| 
 | ||||
| 	for _, blob := range testBlobs { | ||||
| 		l, err := objectStore.Layer(blob.digest) | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 
 | ||||
| 		reader, err := l.Reader() | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		defer reader.Close() | ||||
| 
 | ||||
| 		layerBytes, err := ioutil.ReadAll(reader) | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 
 | ||||
| 		if string(layerBytes) != string(blob.contents) { | ||||
| 			t.Fatal("Incorrect blob") | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -5,7 +5,6 @@ import ( | |||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"sync" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry" | ||||
|  | @ -39,20 +38,49 @@ type ObjectStore interface { | |||
| } | ||||
| 
 | ||||
| // Layer is a generic image layer interface.
 | ||||
| // A Layer may only be written to once
 | ||||
| // A Layer may not be written to if it is already complete.
 | ||||
| type Layer interface { | ||||
| 	// Reader returns an io.ReadCloser which reads the contents of the layer
 | ||||
| 	Reader() (io.ReadCloser, error) | ||||
| 	// Reader returns a LayerReader or an error if the layer has not been
 | ||||
| 	// written to or is currently being written to.
 | ||||
| 	Reader() (LayerReader, error) | ||||
| 
 | ||||
| 	// Writer returns an io.WriteCloser which may write the contents of the
 | ||||
| 	// layer. This method may only be called once per Layer, and the contents
 | ||||
| 	// are made available on Close
 | ||||
| 	Writer() (io.WriteCloser, error) | ||||
| 	// Writer returns a LayerWriter or an error if the layer has been fully
 | ||||
| 	// written to or is currently being written to.
 | ||||
| 	Writer() (LayerWriter, error) | ||||
| 
 | ||||
| 	// Wait blocks until the Layer can be read from
 | ||||
| 	// Wait blocks until the Layer can be read from.
 | ||||
| 	Wait() error | ||||
| } | ||||
| 
 | ||||
| // LayerReader is a read-only handle to a Layer, which exposes the CurrentSize
 | ||||
| // and full Size in addition to implementing the io.ReadCloser interface.
 | ||||
| type LayerReader interface { | ||||
| 	io.ReadCloser | ||||
| 
 | ||||
| 	// CurrentSize returns the number of bytes written to the underlying Layer
 | ||||
| 	CurrentSize() int | ||||
| 
 | ||||
| 	// Size returns the full size of the underlying Layer
 | ||||
| 	Size() int | ||||
| } | ||||
| 
 | ||||
| // LayerWriter is a write-only handle to a Layer, which exposes the CurrentSize
 | ||||
| // and full Size in addition to implementing the io.WriteCloser interface.
 | ||||
| // SetSize must be called on this LayerWriter before it can be written to.
 | ||||
| type LayerWriter interface { | ||||
| 	io.WriteCloser | ||||
| 
 | ||||
| 	// CurrentSize returns the number of bytes written to the underlying Layer
 | ||||
| 	CurrentSize() int | ||||
| 
 | ||||
| 	// Size returns the full size of the underlying Layer
 | ||||
| 	Size() int | ||||
| 
 | ||||
| 	// SetSize sets the full size of the underlying Layer.
 | ||||
| 	// This must be called before any calls to Write
 | ||||
| 	SetSize(int) error | ||||
| } | ||||
| 
 | ||||
| // memoryObjectStore is an in-memory implementation of the ObjectStore interface
 | ||||
| type memoryObjectStore struct { | ||||
| 	mutex           *sync.Mutex | ||||
|  | @ -93,67 +121,113 @@ func (objStore *memoryObjectStore) Layer(dgst digest.Digest) (Layer, error) { | |||
| } | ||||
| 
 | ||||
| type memoryLayer struct { | ||||
| 	cond    *sync.Cond | ||||
| 	buffer  *bytes.Buffer | ||||
| 	written bool | ||||
| 	cond         *sync.Cond | ||||
| 	contents     []byte | ||||
| 	expectedSize int | ||||
| 	writing      bool | ||||
| } | ||||
| 
 | ||||
| func (ml *memoryLayer) Writer() (io.WriteCloser, error) { | ||||
| func (ml *memoryLayer) Reader() (LayerReader, error) { | ||||
| 	ml.cond.L.Lock() | ||||
| 	defer ml.cond.L.Unlock() | ||||
| 
 | ||||
| 	if ml.buffer != nil { | ||||
| 		if !ml.written { | ||||
| 			return nil, ErrLayerLocked | ||||
| 		} | ||||
| 		return nil, ErrLayerAlreadyExists | ||||
| 	} | ||||
| 
 | ||||
| 	ml.buffer = new(bytes.Buffer) | ||||
| 	return &memoryLayerWriter{cond: ml.cond, buffer: ml.buffer, done: &ml.written}, nil | ||||
| } | ||||
| 
 | ||||
| func (ml *memoryLayer) Reader() (io.ReadCloser, error) { | ||||
| 	ml.cond.L.Lock() | ||||
| 	defer ml.cond.L.Unlock() | ||||
| 
 | ||||
| 	if ml.buffer == nil { | ||||
| 	if ml.contents == nil { | ||||
| 		return nil, fmt.Errorf("Layer has not been written to yet") | ||||
| 	} | ||||
| 	if !ml.written { | ||||
| 	if ml.writing { | ||||
| 		return nil, ErrLayerLocked | ||||
| 	} | ||||
| 
 | ||||
| 	return ioutil.NopCloser(bytes.NewReader(ml.buffer.Bytes())), nil | ||||
| 	return &memoryLayerReader{ml: ml, reader: bytes.NewReader(ml.contents)}, nil | ||||
| } | ||||
| 
 | ||||
| func (ml *memoryLayer) Writer() (LayerWriter, error) { | ||||
| 	ml.cond.L.Lock() | ||||
| 	defer ml.cond.L.Unlock() | ||||
| 
 | ||||
| 	if ml.contents != nil { | ||||
| 		if ml.writing { | ||||
| 			return nil, ErrLayerLocked | ||||
| 		} | ||||
| 		if ml.expectedSize == len(ml.contents) { | ||||
| 			return nil, ErrLayerAlreadyExists | ||||
| 		} | ||||
| 	} else { | ||||
| 		ml.contents = make([]byte, 0) | ||||
| 	} | ||||
| 
 | ||||
| 	ml.writing = true | ||||
| 	return &memoryLayerWriter{ml: ml, buffer: bytes.NewBuffer(ml.contents)}, nil | ||||
| } | ||||
| 
 | ||||
| func (ml *memoryLayer) Wait() error { | ||||
| 	ml.cond.L.Lock() | ||||
| 	defer ml.cond.L.Unlock() | ||||
| 
 | ||||
| 	if ml.buffer == nil { | ||||
| 	if ml.contents == nil { | ||||
| 		return fmt.Errorf("No writer to wait on") | ||||
| 	} | ||||
| 
 | ||||
| 	for !ml.written { | ||||
| 	for ml.writing { | ||||
| 		ml.cond.Wait() | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| type memoryLayerReader struct { | ||||
| 	ml     *memoryLayer | ||||
| 	reader *bytes.Reader | ||||
| } | ||||
| 
 | ||||
| func (mlr *memoryLayerReader) Read(p []byte) (int, error) { | ||||
| 	return mlr.reader.Read(p) | ||||
| } | ||||
| 
 | ||||
| func (mlr *memoryLayerReader) Close() error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (mlr *memoryLayerReader) CurrentSize() int { | ||||
| 	return len(mlr.ml.contents) | ||||
| } | ||||
| 
 | ||||
| func (mlr *memoryLayerReader) Size() int { | ||||
| 	return mlr.ml.expectedSize | ||||
| } | ||||
| 
 | ||||
| type memoryLayerWriter struct { | ||||
| 	cond   *sync.Cond | ||||
| 	ml     *memoryLayer | ||||
| 	buffer *bytes.Buffer | ||||
| 	done   *bool | ||||
| } | ||||
| 
 | ||||
| func (mlw *memoryLayerWriter) Write(p []byte) (int, error) { | ||||
| 	return mlw.buffer.Write(p) | ||||
| 	if mlw.ml.expectedSize == 0 { | ||||
| 		return 0, fmt.Errorf("Must set size before writing to layer") | ||||
| 	} | ||||
| 	wrote, err := mlw.buffer.Write(p) | ||||
| 	mlw.ml.contents = mlw.buffer.Bytes() | ||||
| 	return wrote, err | ||||
| } | ||||
| 
 | ||||
| func (mlw *memoryLayerWriter) Close() error { | ||||
| 	*mlw.done = true | ||||
| 	mlw.cond.Broadcast() | ||||
| 	mlw.ml.writing = false | ||||
| 	mlw.ml.cond.Broadcast() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (mlw *memoryLayerWriter) CurrentSize() int { | ||||
| 	return len(mlw.ml.contents) | ||||
| } | ||||
| 
 | ||||
| func (mlw *memoryLayerWriter) Size() int { | ||||
| 	return mlw.ml.expectedSize | ||||
| } | ||||
| 
 | ||||
| func (mlw *memoryLayerWriter) SetSize(size int) error { | ||||
| 	if !mlw.ml.writing { | ||||
| 		return fmt.Errorf("Layer is closed for writing") | ||||
| 	} | ||||
| 	mlw.ml.expectedSize = size | ||||
| 	return nil | ||||
| } | ||||
|  |  | |||
|  | @ -89,7 +89,7 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. | |||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	writer, err := layer.Writer() | ||||
| 	layerWriter, err := layer.Writer() | ||||
| 	if err == ErrLayerAlreadyExists { | ||||
| 		log.WithField("layer", fsLayer).Info("Layer already exists") | ||||
| 		return nil | ||||
|  | @ -106,9 +106,17 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. | |||
| 		}).Warn("Unable to write local layer") | ||||
| 		return err | ||||
| 	} | ||||
| 	defer writer.Close() | ||||
| 	defer layerWriter.Close() | ||||
| 
 | ||||
| 	layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, 0) | ||||
| 	if layerWriter.CurrentSize() > 0 { | ||||
| 		log.WithFields(log.Fields{ | ||||
| 			"layer":       fsLayer, | ||||
| 			"currentSize": layerWriter.CurrentSize(), | ||||
| 			"size":        layerWriter.Size(), | ||||
| 		}).Info("Layer partially downloaded, resuming") | ||||
| 	} | ||||
| 
 | ||||
| 	layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize()) | ||||
| 	if err != nil { | ||||
| 		log.WithFields(log.Fields{ | ||||
| 			"error": err, | ||||
|  | @ -118,7 +126,9 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. | |||
| 	} | ||||
| 	defer layerReader.Close() | ||||
| 
 | ||||
| 	copied, err := io.Copy(writer, layerReader) | ||||
| 	layerWriter.SetSize(layerWriter.CurrentSize() + length) | ||||
| 
 | ||||
| 	_, err = io.Copy(layerWriter, layerReader) | ||||
| 	if err != nil { | ||||
| 		log.WithFields(log.Fields{ | ||||
| 			"error": err, | ||||
|  | @ -126,15 +136,15 @@ func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. | |||
| 		}).Warn("Unable to download layer") | ||||
| 		return err | ||||
| 	} | ||||
| 	if copied != int64(length) { | ||||
| 	if layerWriter.CurrentSize() != layerWriter.Size() { | ||||
| 		log.WithFields(log.Fields{ | ||||
| 			"expected": length, | ||||
| 			"written":  copied, | ||||
| 			"layer":    fsLayer, | ||||
| 		}).Warn("Wrote incorrect number of bytes for layer") | ||||
| 			"size":        layerWriter.Size(), | ||||
| 			"currentSize": layerWriter.CurrentSize(), | ||||
| 			"layer":       fsLayer, | ||||
| 		}).Warn("Layer invalid size") | ||||
| 		return fmt.Errorf( | ||||
| 			"Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d", | ||||
| 			fsLayer, length, copied, | ||||
| 			fsLayer, layerWriter.Size(), layerWriter.CurrentSize(), | ||||
| 		) | ||||
| 	} | ||||
| 	return nil | ||||
|  |  | |||
|  | @ -1,9 +1,7 @@ | |||
| package client | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"errors" | ||||
| 
 | ||||
| 	"github.com/docker/docker-registry" | ||||
| 
 | ||||
|  | @ -96,14 +94,13 @@ func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. | |||
| 	} | ||||
| 	defer layerReader.Close() | ||||
| 
 | ||||
| 	layerBuffer := new(bytes.Buffer) | ||||
| 	layerSize, err := io.Copy(layerBuffer, layerReader) | ||||
| 	if err != nil { | ||||
| 	if layerReader.CurrentSize() != layerReader.Size() { | ||||
| 		log.WithFields(log.Fields{ | ||||
| 			"error": err, | ||||
| 			"layer": fsLayer, | ||||
| 		}).Warn("Unable to read local layer") | ||||
| 		return err | ||||
| 			"layer":       fsLayer, | ||||
| 			"currentSize": layerReader.CurrentSize(), | ||||
| 			"size":        layerReader.Size(), | ||||
| 		}).Warn("Local layer incomplete") | ||||
| 		return errors.New("Local layer incomplete") | ||||
| 	} | ||||
| 
 | ||||
| 	length, err := c.BlobLength(name, fsLayer.BlobSum) | ||||
|  | @ -128,7 +125,7 @@ func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry. | |||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	err = c.UploadBlob(location, ioutil.NopCloser(layerBuffer), int(layerSize), fsLayer.BlobSum) | ||||
| 	err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum) | ||||
| 	if err != nil { | ||||
| 		log.WithFields(log.Fields{ | ||||
| 			"error": err, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue