456 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			456 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
package storage
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"io/ioutil"
 | 
						|
	"os"
 | 
						|
	"path/filepath"
 | 
						|
 | 
						|
	"code.google.com/p/go-uuid/uuid"
 | 
						|
 | 
						|
	"github.com/docker/docker-registry/digest"
 | 
						|
	"github.com/docker/docker-registry/storagedriver"
 | 
						|
	"github.com/docker/docker/pkg/tarsum"
 | 
						|
 | 
						|
	"io"
 | 
						|
)
 | 
						|
 | 
						|
// LayerUploadState captures the state serializable state of the layer upload.
 | 
						|
type LayerUploadState struct {
 | 
						|
	// name is the primary repository under which the layer will be linked.
 | 
						|
	Name string
 | 
						|
 | 
						|
	// UUID identifies the upload.
 | 
						|
	UUID string
 | 
						|
 | 
						|
	// offset contains the current progress of the upload.
 | 
						|
	Offset int64
 | 
						|
}
 | 
						|
 | 
						|
// layerUploadController is used to control the various aspects of resumable
 | 
						|
// layer upload. It implements the LayerUpload interface.
 | 
						|
type layerUploadController struct {
 | 
						|
	LayerUploadState
 | 
						|
 | 
						|
	layerStore  *layerStore
 | 
						|
	uploadStore layerUploadStore
 | 
						|
	fp          layerFile
 | 
						|
	err         error // terminal error, if set, controller is closed
 | 
						|
}
 | 
						|
 | 
						|
// layerFile documents the interface used while writing layer files, similar
 | 
						|
// to *os.File. This is separate from layerReader, for now, because we want to
 | 
						|
// store uploads on the local file system until we have write-through hashing
 | 
						|
// support. They should be combined once this is worked out.
 | 
						|
type layerFile interface {
 | 
						|
	io.WriteSeeker
 | 
						|
	io.Reader
 | 
						|
	io.Closer
 | 
						|
 | 
						|
	// Sync commits the contents of the writer to storage.
 | 
						|
	Sync() (err error)
 | 
						|
}
 | 
						|
 | 
						|
// layerUploadStore provides storage for temporary files and upload state of
 | 
						|
// layers. This is be used by the LayerService to manage the state of ongoing
 | 
						|
// uploads. This interface will definitely change and will most likely end up
 | 
						|
// being exported to the app layer. Move the layer.go when it's ready to go.
 | 
						|
type layerUploadStore interface {
 | 
						|
	New(name string) (LayerUploadState, error)
 | 
						|
	Open(uuid string) (layerFile, error)
 | 
						|
	GetState(uuid string) (LayerUploadState, error)
 | 
						|
	SaveState(lus LayerUploadState) error
 | 
						|
	DeleteState(uuid string) error
 | 
						|
}
 | 
						|
 | 
						|
var _ LayerUpload = &layerUploadController{}
 | 
						|
 | 
						|
// Name of the repository under which the layer will be linked.
 | 
						|
func (luc *layerUploadController) Name() string {
 | 
						|
	return luc.LayerUploadState.Name
 | 
						|
}
 | 
						|
 | 
						|
// UUID returns the identifier for this upload.
 | 
						|
func (luc *layerUploadController) UUID() string {
 | 
						|
	return luc.LayerUploadState.UUID
 | 
						|
}
 | 
						|
 | 
						|
// Offset returns the position of the last byte written to this layer.
 | 
						|
func (luc *layerUploadController) Offset() int64 {
 | 
						|
	return luc.LayerUploadState.Offset
 | 
						|
}
 | 
						|
 | 
						|
// Finish marks the upload as completed, returning a valid handle to the
 | 
						|
// uploaded layer. The final size and checksum are validated against the
 | 
						|
// contents of the uploaded layer. The checksum should be provided in the
 | 
						|
// format <algorithm>:<hex digest>.
 | 
						|
func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) {
 | 
						|
 | 
						|
	// This section is going to be pretty ugly now. We will have to read the
 | 
						|
	// file twice. First, to get the tarsum and checksum. When those are
 | 
						|
	// available, and validated, we will upload it to the blob store and link
 | 
						|
	// it into the repository. In the future, we need to use resumable hash
 | 
						|
	// calculations for tarsum and checksum that can be calculated during the
 | 
						|
	// upload. This will allow us to cut the data directly into a temporary
 | 
						|
	// directory in the storage backend.
 | 
						|
 | 
						|
	fp, err := luc.file()
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		// Cleanup?
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	digest, err = luc.validateLayer(fp, size, digest)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if nn, err := luc.writeLayer(fp, digest); err != nil {
 | 
						|
		// Cleanup?
 | 
						|
		return nil, err
 | 
						|
	} else if size >= 0 && nn != size {
 | 
						|
		// TODO(stevvooe): Short write. Will have to delete the location and
 | 
						|
		// report an error. This error needs to be reported to the client.
 | 
						|
		return nil, fmt.Errorf("short write writing layer")
 | 
						|
	}
 | 
						|
 | 
						|
	// Yes! We have written some layer data. Let's make it visible. Link the
 | 
						|
	// layer blob into the repository.
 | 
						|
	if err := luc.linkLayer(digest); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Ok, the upload has completed and finished. Delete the state.
 | 
						|
	if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
 | 
						|
		// Can we ignore this error?
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return luc.layerStore.Fetch(luc.Name(), digest)
 | 
						|
}
 | 
						|
 | 
						|
// Cancel the layer upload process.
 | 
						|
func (luc *layerUploadController) Cancel() error {
 | 
						|
	if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return luc.Close()
 | 
						|
}
 | 
						|
 | 
						|
func (luc *layerUploadController) Write(p []byte) (int, error) {
 | 
						|
	wr, err := luc.file()
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	n, err := wr.Write(p)
 | 
						|
 | 
						|
	// Because we expect the reported offset to be consistent with the storage
 | 
						|
	// state, unfortunately, we need to Sync on every call to write.
 | 
						|
	if err := wr.Sync(); err != nil {
 | 
						|
		// Effectively, ignore the write state if the Sync fails. Report that
 | 
						|
		// no bytes were written and seek back to the starting offset.
 | 
						|
		offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
 | 
						|
		if seekErr != nil {
 | 
						|
			// What do we do here? Quite disasterous.
 | 
						|
			luc.reset()
 | 
						|
 | 
						|
			return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
 | 
						|
		}
 | 
						|
 | 
						|
		if offset != luc.Offset() {
 | 
						|
			return 0, fmt.Errorf("unexpected offset after seek")
 | 
						|
		}
 | 
						|
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	luc.LayerUploadState.Offset += int64(n)
 | 
						|
 | 
						|
	if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
 | 
						|
		// TODO(stevvooe): This failure case may require more thought.
 | 
						|
		return n, err
 | 
						|
	}
 | 
						|
 | 
						|
	return n, err
 | 
						|
}
 | 
						|
 | 
						|
func (luc *layerUploadController) Close() error {
 | 
						|
	if luc.err != nil {
 | 
						|
		return luc.err
 | 
						|
	}
 | 
						|
 | 
						|
	if luc.fp != nil {
 | 
						|
		luc.err = luc.fp.Close()
 | 
						|
	}
 | 
						|
 | 
						|
	return luc.err
 | 
						|
}
 | 
						|
 | 
						|
func (luc *layerUploadController) file() (layerFile, error) {
 | 
						|
	if luc.fp != nil {
 | 
						|
		return luc.fp, nil
 | 
						|
	}
 | 
						|
 | 
						|
	fp, err := luc.uploadStore.Open(luc.UUID())
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO(stevvooe): We may need a more aggressive check here to ensure that
 | 
						|
	// the file length is equal to the current offset. We may want to sync the
 | 
						|
	// offset before return the layer upload to the client so it can be
 | 
						|
	// validated before proceeding with any writes.
 | 
						|
 | 
						|
	// Seek to the current layer offset for good measure.
 | 
						|
	if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	luc.fp = fp
 | 
						|
 | 
						|
	return luc.fp, nil
 | 
						|
}
 | 
						|
 | 
						|
// reset closes and drops the current writer.
 | 
						|
func (luc *layerUploadController) reset() {
 | 
						|
	if luc.fp != nil {
 | 
						|
		luc.fp.Close()
 | 
						|
		luc.fp = nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// validateLayer runs several checks on the layer file to ensure its validity.
 | 
						|
// This is currently very expensive and relies on fast io and fast seek on the
 | 
						|
// local host. If successful, the latest digest is returned, which should be
 | 
						|
// used over the passed in value.
 | 
						|
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) {
 | 
						|
	// First, check the incoming tarsum version of the digest.
 | 
						|
	version, err := tarsum.GetVersionFromTarsum(dgst.String())
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO(stevvooe): Should we push this down into the digest type?
 | 
						|
	switch version {
 | 
						|
	case tarsum.Version1:
 | 
						|
	default:
 | 
						|
		// version 0 and dev, for now.
 | 
						|
		return "", ErrLayerTarSumVersionUnsupported
 | 
						|
	}
 | 
						|
 | 
						|
	digestVerifier := digest.NewDigestVerifier(dgst)
 | 
						|
	lengthVerifier := digest.NewLengthVerifier(size)
 | 
						|
 | 
						|
	// First, seek to the end of the file, checking the size is as expected.
 | 
						|
	end, err := fp.Seek(0, os.SEEK_END)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	// Only check size if it is greater than
 | 
						|
	if size >= 0 && end != size {
 | 
						|
		// Fast path length check.
 | 
						|
		return "", ErrLayerInvalidSize{Size: size}
 | 
						|
	}
 | 
						|
 | 
						|
	// Now seek back to start and take care of the digest.
 | 
						|
	if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	tr := io.TeeReader(fp, digestVerifier)
 | 
						|
 | 
						|
	// Only verify the size if a positive size argument has been passed.
 | 
						|
	if size >= 0 {
 | 
						|
		tr = io.TeeReader(tr, lengthVerifier)
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO(stevvooe): This is one of the places we need a Digester write
 | 
						|
	// sink. Instead, its read driven. This migth be okay.
 | 
						|
 | 
						|
	// Calculate an updated digest with the latest version.
 | 
						|
	dgst, err = digest.FromReader(tr)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	if size >= 0 && !lengthVerifier.Verified() {
 | 
						|
		return "", ErrLayerInvalidSize{Size: size}
 | 
						|
	}
 | 
						|
 | 
						|
	if !digestVerifier.Verified() {
 | 
						|
		return "", ErrLayerInvalidDigest{FSLayer{BlobSum: dgst}}
 | 
						|
	}
 | 
						|
 | 
						|
	return dgst, nil
 | 
						|
}
 | 
						|
 | 
						|
// writeLayer actually writes the the layer file into its final destination,
 | 
						|
// identified by dgst. The layer should be validated before commencing the
 | 
						|
// write.
 | 
						|
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
 | 
						|
	blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
 | 
						|
		digest: dgst,
 | 
						|
	})
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Check for existence
 | 
						|
	if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
 | 
						|
		// TODO(stevvooe): This check is kind of problematic and very racy.
 | 
						|
		switch err := err.(type) {
 | 
						|
		case storagedriver.PathNotFoundError:
 | 
						|
			break // ensure that it doesn't exist.
 | 
						|
		default:
 | 
						|
			// TODO(stevvooe): This isn't actually an error: the blob store is
 | 
						|
			// content addressable and we should just use this to ensure we
 | 
						|
			// have it written. Although, we do need to verify that the
 | 
						|
			// content that is there is the correct length.
 | 
						|
			return 0, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Seek our local layer file back now.
 | 
						|
	if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
 | 
						|
		// Cleanup?
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Okay: we can write the file to the blob store.
 | 
						|
	return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
 | 
						|
}
 | 
						|
 | 
						|
// linkLayer links a valid, written layer blob into the registry under the
 | 
						|
// named repository for the upload controller.
 | 
						|
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
 | 
						|
	layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
 | 
						|
		name:   luc.Name(),
 | 
						|
		digest: digest,
 | 
						|
	})
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest))
 | 
						|
}
 | 
						|
 | 
						|
// localFSLayerUploadStore implements a local layerUploadStore. There are some
 | 
						|
// complexities around hashsums that make round tripping to the storage
 | 
						|
// backend problematic, so we'll store and read locally for now. By GO-beta,
 | 
						|
// this should be fully implemented on top of the backend storagedriver.
 | 
						|
//
 | 
						|
// For now, the directory layout is as follows:
 | 
						|
//
 | 
						|
// 	/<temp dir>/registry-layer-upload/
 | 
						|
// 		<uuid>/
 | 
						|
// 			-> state.json
 | 
						|
// 			-> data
 | 
						|
//
 | 
						|
// Each upload, identified by uuid, has its own directory with a state file
 | 
						|
// and a data file. The state file has a json representation of the current
 | 
						|
// state. The data file is the in-progress upload data.
 | 
						|
type localFSLayerUploadStore struct {
 | 
						|
	root string
 | 
						|
}
 | 
						|
 | 
						|
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
 | 
						|
	path, err := ioutil.TempDir("", "registry-layer-upload")
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &localFSLayerUploadStore{
 | 
						|
		root: path,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) {
 | 
						|
	lus := LayerUploadState{
 | 
						|
		Name: name,
 | 
						|
		UUID: uuid.New(),
 | 
						|
	}
 | 
						|
 | 
						|
	if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
 | 
						|
		return lus, err
 | 
						|
	}
 | 
						|
 | 
						|
	if err := llufs.SaveState(lus); err != nil {
 | 
						|
		return lus, err
 | 
						|
	}
 | 
						|
 | 
						|
	return lus, nil
 | 
						|
}
 | 
						|
 | 
						|
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
 | 
						|
	fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return fp, nil
 | 
						|
}
 | 
						|
 | 
						|
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
 | 
						|
	// TODO(stevvoe): Storing this state on the local file system is an
 | 
						|
	// intermediate stop gap. This technique is unlikely to handle any kind of
 | 
						|
	// concurrency very well.
 | 
						|
 | 
						|
	var lus LayerUploadState
 | 
						|
	fp, err := os.Open(llufs.path(uuid, "state.json"))
 | 
						|
	if err != nil {
 | 
						|
		if os.IsNotExist(err) {
 | 
						|
			return lus, ErrLayerUploadUnknown
 | 
						|
		}
 | 
						|
 | 
						|
		return lus, err
 | 
						|
	}
 | 
						|
	defer fp.Close()
 | 
						|
 | 
						|
	dec := json.NewDecoder(fp)
 | 
						|
	if err := dec.Decode(&lus); err != nil {
 | 
						|
		return lus, err
 | 
						|
	}
 | 
						|
 | 
						|
	return lus, nil
 | 
						|
}
 | 
						|
 | 
						|
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
 | 
						|
	p, err := json.Marshal(lus)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
 | 
						|
	if os.IsNotExist(err) {
 | 
						|
		return ErrLayerUploadUnknown
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
 | 
						|
	if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
 | 
						|
		if os.IsNotExist(err) {
 | 
						|
			return ErrLayerUploadUnknown
 | 
						|
		}
 | 
						|
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
 | 
						|
	return filepath.Join(llufs.root, uuid, file)
 | 
						|
}
 |