Storage driver for: Google Cloud Storage (gcs)
Signed-off-by: Arthur Baars <arthur@semmle.com>master
							parent
							
								
									41e71e9a46
								
							
						
					
					
						commit
						59784ecdd0
					
				|  | @ -6,7 +6,7 @@ RUN apt-get update && \ | ||||||
| 
 | 
 | ||||||
| ENV DISTRIBUTION_DIR /go/src/github.com/docker/distribution | ENV DISTRIBUTION_DIR /go/src/github.com/docker/distribution | ||||||
| ENV GOPATH $DISTRIBUTION_DIR/Godeps/_workspace:$GOPATH | ENV GOPATH $DISTRIBUTION_DIR/Godeps/_workspace:$GOPATH | ||||||
| ENV DOCKER_BUILDTAGS include_rados include_oss | ENV DOCKER_BUILDTAGS include_rados include_oss include_gcs | ||||||
| 
 | 
 | ||||||
| WORKDIR $DISTRIBUTION_DIR | WORKDIR $DISTRIBUTION_DIR | ||||||
| COPY . $DISTRIBUTION_DIR | COPY . $DISTRIBUTION_DIR | ||||||
|  |  | ||||||
|  | @ -21,7 +21,7 @@ machine: | ||||||
|     BASE_OLD: ../../../$HOME/.gvm/pkgsets/old/global/$BASE_DIR |     BASE_OLD: ../../../$HOME/.gvm/pkgsets/old/global/$BASE_DIR | ||||||
|     BASE_STABLE: ../../../$HOME/.gvm/pkgsets/stable/global/$BASE_DIR |     BASE_STABLE: ../../../$HOME/.gvm/pkgsets/stable/global/$BASE_DIR | ||||||
|   # BASE_BLEED: ../../../$HOME/.gvm/pkgsets/bleed/global/$BASE_DIR |   # BASE_BLEED: ../../../$HOME/.gvm/pkgsets/bleed/global/$BASE_DIR | ||||||
|     DOCKER_BUILDTAGS: "include_rados include_oss" |     DOCKER_BUILDTAGS: "include_rados include_oss include_gcs" | ||||||
|   # Workaround Circle parsing dumb bugs and/or YAML wonkyness |   # Workaround Circle parsing dumb bugs and/or YAML wonkyness | ||||||
|     CIRCLE_PAIN: "mode: set" |     CIRCLE_PAIN: "mode: set" | ||||||
|   # Ceph config |   # Ceph config | ||||||
|  |  | ||||||
|  | @ -10,6 +10,7 @@ import ( | ||||||
| 	_ "github.com/docker/distribution/registry/proxy" | 	_ "github.com/docker/distribution/registry/proxy" | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/azure" | 	_ "github.com/docker/distribution/registry/storage/driver/azure" | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/filesystem" | 	_ "github.com/docker/distribution/registry/storage/driver/filesystem" | ||||||
|  | 	_ "github.com/docker/distribution/registry/storage/driver/gcs" | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/inmemory" | 	_ "github.com/docker/distribution/registry/storage/driver/inmemory" | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" | 	_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/oss" | 	_ "github.com/docker/distribution/registry/storage/driver/oss" | ||||||
|  |  | ||||||
|  | @ -82,6 +82,10 @@ information about each option that appears later in this page. | ||||||
|         accountname: accountname |         accountname: accountname | ||||||
|         accountkey: base64encodedaccountkey |         accountkey: base64encodedaccountkey | ||||||
|         container: containername |         container: containername | ||||||
|  |       gcs: | ||||||
|  |         bucket: bucketname | ||||||
|  |         keyfile: /path/to/keyfile | ||||||
|  |         rootdirectory: /gcs/object/name/prefix | ||||||
|       s3: |       s3: | ||||||
|         accesskey: awsaccesskey |         accesskey: awsaccesskey | ||||||
|         secretkey: awssecretkey |         secretkey: awssecretkey | ||||||
|  | @ -330,6 +334,10 @@ Permitted values are `error`, `warn`, `info` and `debug`. The default is | ||||||
|         accountname: accountname |         accountname: accountname | ||||||
|         accountkey: base64encodedaccountkey |         accountkey: base64encodedaccountkey | ||||||
|         container: containername |         container: containername | ||||||
|  |       gcs: | ||||||
|  |         bucket: bucketname | ||||||
|  |         keyfile: /path/to/keyfile | ||||||
|  |         rootdirectory: /gcs/object/name/prefix | ||||||
|       s3: |       s3: | ||||||
|         accesskey: awsaccesskey |         accesskey: awsaccesskey | ||||||
|         secretkey: awssecretkey |         secretkey: awssecretkey | ||||||
|  | @ -482,6 +490,50 @@ This storage backend uses Microsoft's Azure Blob Storage. | ||||||
| 
 | 
 | ||||||
| </table> | </table> | ||||||
| 
 | 
 | ||||||
|  | # gcs | ||||||
|  | 
 | ||||||
|  | This storage backend uses Google Cloud Storage. | ||||||
|  | 
 | ||||||
|  | <table> | ||||||
|  |   <tr> | ||||||
|  |     <th>Parameter</th> | ||||||
|  |     <th>Required</th> | ||||||
|  |     <th>Description</th> | ||||||
|  |   </tr> | ||||||
|  |   <tr> | ||||||
|  |     <td> | ||||||
|  |       <code>bucket</code> | ||||||
|  |     </td> | ||||||
|  |     <td> | ||||||
|  |       yes | ||||||
|  |     </td> | ||||||
|  |     <td> | ||||||
|  |       Storage bucket name. | ||||||
|  |     </td> | ||||||
|  |   </tr> | ||||||
|  |   <tr> | ||||||
|  |     <td> | ||||||
|  |       <code>keyfile</code> | ||||||
|  |     </td> | ||||||
|  |     <td> | ||||||
|  |       no | ||||||
|  |     </td> | ||||||
|  |     <td> | ||||||
|  |       A private service account key file in JSON format. Instead of a key file <a href="https://developers.google.com/identity/protocols/application-default-credentials">Google Application Default Credentials</a> can be used. | ||||||
|  |     </td> | ||||||
|  |   </tr> | ||||||
|  |    <tr> | ||||||
|  |     <td> | ||||||
|  |       <code>rootdirectory</code> | ||||||
|  |     </td> | ||||||
|  |     <td> | ||||||
|  |       no | ||||||
|  |     </td> | ||||||
|  |     <td> | ||||||
|  |       This is a prefix that will be applied to all Google Cloud Storage keys to allow you to segment data in your bucket if necessary. | ||||||
|  |   </tr> | ||||||
|  | 
 | ||||||
|  | </table> | ||||||
| 
 | 
 | ||||||
| ### rados | ### rados | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -0,0 +1,22 @@ | ||||||
|  | <!--[metadata]> | ||||||
|  | +++ | ||||||
|  | title = "GCS storage driver" | ||||||
|  | description = "Explains how to use the Google Cloud Storage drivers" | ||||||
|  | keywords = ["registry, service, driver, images, storage,  gcs, google, cloud"] | ||||||
|  | +++ | ||||||
|  | <![end-metadata]--> | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # Google Cloud Storage driver | ||||||
|  | 
 | ||||||
|  | An implementation of the `storagedriver.StorageDriver` interface which uses Google Cloud for object storage. | ||||||
|  | 
 | ||||||
|  | ## Parameters | ||||||
|  | 
 | ||||||
|  | `bucket`: The name of your Google Cloud Storage bucket where you wish to store objects (needs to already be created prior to driver initialization). | ||||||
|  | 
 | ||||||
|  | `keyfile`: (optional) A private key file in JSON format, used for [Service Account Authentication](https://cloud.google.com/storage/docs/authentication#service_accounts). | ||||||
|  | 
 | ||||||
|  | **Note** Instead of a key file you can use [Google Application Default Credentials](https://developers.google.com/identity/protocols/application-default-credentials). | ||||||
|  | 
 | ||||||
|  | `rootdirectory`: (optional) The root directory tree in which all registry files will be stored. Defaults to the empty string (bucket root). | ||||||
|  | @ -24,6 +24,7 @@ This storage driver package comes bundled with several drivers: | ||||||
| - [rados](storage-drivers/rados.md): A driver storing objects in a [Ceph Object Storage](http://ceph.com/docs/master/rados/) pool. | - [rados](storage-drivers/rados.md): A driver storing objects in a [Ceph Object Storage](http://ceph.com/docs/master/rados/) pool. | ||||||
| - [swift](storage-drivers/swift.md): A driver storing objects in [Openstack Swift](http://docs.openstack.org/developer/swift/). | - [swift](storage-drivers/swift.md): A driver storing objects in [Openstack Swift](http://docs.openstack.org/developer/swift/). | ||||||
| - [oss](storage-drivers/oss.md): A driver storing objects in [Aliyun OSS](http://www.aliyun.com/product/oss). | - [oss](storage-drivers/oss.md): A driver storing objects in [Aliyun OSS](http://www.aliyun.com/product/oss). | ||||||
|  | - [gcs](storage-drivers/gcs.md): A driver storing objects in a [Google Cloud Storage](https://cloud.google.com/storage/) bucket. | ||||||
| 
 | 
 | ||||||
| ## Storage Driver API | ## Storage Driver API | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -0,0 +1,3 @@ | ||||||
|  | // Package gcs implements the Google Cloud Storage driver backend. Support can be
 | ||||||
|  | // enabled by including the "include_gcs" build tag.
 | ||||||
|  | package gcs | ||||||
|  | @ -0,0 +1,623 @@ | ||||||
|  | // Package gcs provides a storagedriver.StorageDriver implementation to
 | ||||||
|  | // store blobs in Google cloud storage.
 | ||||||
|  | //
 | ||||||
|  | // This package leverages the google.golang.org/cloud/storage client library
 | ||||||
|  | //for interfacing with gcs.
 | ||||||
|  | //
 | ||||||
|  | // Because gcs is a key, value store the Stat call does not support last modification
 | ||||||
|  | // time for directories (directories are an abstraction for key, value stores)
 | ||||||
|  | //
 | ||||||
|  | // Keep in mind that gcs guarantees only eventual consistency, so do not assume
 | ||||||
|  | // that a successful write will mean immediate access to the data written (although
 | ||||||
|  | // in most regions a new object put has guaranteed read after write). The only true
 | ||||||
|  | // guarantee is that once you call Stat and receive a certain file size, that much of
 | ||||||
|  | // the file is already accessible.
 | ||||||
|  | //
 | ||||||
|  | // +build include_gcs
 | ||||||
|  | 
 | ||||||
|  | package gcs | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"io/ioutil" | ||||||
|  | 	"math/rand" | ||||||
|  | 	"net/http" | ||||||
|  | 	"net/url" | ||||||
|  | 	"sort" | ||||||
|  | 	"strings" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"golang.org/x/net/context" | ||||||
|  | 	"golang.org/x/oauth2" | ||||||
|  | 	"golang.org/x/oauth2/google" | ||||||
|  | 
 | ||||||
|  | 	"google.golang.org/api/googleapi" | ||||||
|  | 	storageapi "google.golang.org/api/storage/v1" | ||||||
|  | 	"google.golang.org/cloud" | ||||||
|  | 	"google.golang.org/cloud/storage" | ||||||
|  | 
 | ||||||
|  | 	ctx "github.com/docker/distribution/context" | ||||||
|  | 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||||
|  | 	"github.com/docker/distribution/registry/storage/driver/base" | ||||||
|  | 	"github.com/docker/distribution/registry/storage/driver/factory" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const driverName = "gcs" | ||||||
|  | const dummyProjectID = "<unknown>" | ||||||
|  | 
 | ||||||
|  | //DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
 | ||||||
|  | type driverParameters struct { | ||||||
|  | 	bucket        string | ||||||
|  | 	keyfile       string | ||||||
|  | 	rootDirectory string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func init() { | ||||||
|  | 	factory.Register(driverName, &gcsDriverFactory{}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // gcsDriverFactory implements the factory.StorageDriverFactory interface
 | ||||||
|  | type gcsDriverFactory struct{} | ||||||
|  | 
 | ||||||
|  | // Create StorageDriver from parameters
 | ||||||
|  | func (factory *gcsDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { | ||||||
|  | 	return FromParameters(parameters) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // driver is a storagedriver.StorageDriver implementation backed by GCS
 | ||||||
|  | // Objects are stored at absolute keys in the provided bucket.
 | ||||||
|  | type driver struct { | ||||||
|  | 	client        *http.Client | ||||||
|  | 	bucket        string | ||||||
|  | 	email         string | ||||||
|  | 	privateKey    []byte | ||||||
|  | 	rootDirectory string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // FromParameters constructs a new Driver with a given parameters map
 | ||||||
|  | // Required parameters:
 | ||||||
|  | // - bucket
 | ||||||
|  | func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { | ||||||
|  | 
 | ||||||
|  | 	bucket, ok := parameters["bucket"] | ||||||
|  | 	if !ok || fmt.Sprint(bucket) == "" { | ||||||
|  | 		return nil, fmt.Errorf("No bucket parameter provided") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	keyfile, ok := parameters["keyfile"] | ||||||
|  | 	if !ok { | ||||||
|  | 		keyfile = "" | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	rootDirectory, ok := parameters["rootdirectory"] | ||||||
|  | 	if !ok { | ||||||
|  | 		rootDirectory = "" | ||||||
|  | 	} | ||||||
|  | 	params := driverParameters{ | ||||||
|  | 		fmt.Sprint(bucket), | ||||||
|  | 		fmt.Sprint(keyfile), | ||||||
|  | 		fmt.Sprint(rootDirectory), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return New(params) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // New constructs a new driver
 | ||||||
|  | func New(params driverParameters) (storagedriver.StorageDriver, error) { | ||||||
|  | 	var ts oauth2.TokenSource | ||||||
|  | 	var err error | ||||||
|  | 	rootDirectory := strings.Trim(params.rootDirectory, "/") | ||||||
|  | 	if rootDirectory != "" { | ||||||
|  | 		rootDirectory += "/" | ||||||
|  | 	} | ||||||
|  | 	d := &driver{ | ||||||
|  | 		bucket:        params.bucket, | ||||||
|  | 		rootDirectory: rootDirectory, | ||||||
|  | 	} | ||||||
|  | 	if params.keyfile == "" { | ||||||
|  | 		ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		jsonKey, err := ioutil.ReadFile(params.keyfile) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		conf, err := google.JWTConfigFromJSON( | ||||||
|  | 			jsonKey, | ||||||
|  | 			storage.ScopeFullControl, | ||||||
|  | 		) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		ts = conf.TokenSource(context.Background()) | ||||||
|  | 		d.email = conf.Email | ||||||
|  | 		d.privateKey = conf.PrivateKey | ||||||
|  | 	} | ||||||
|  | 	client := oauth2.NewClient(context.Background(), ts) | ||||||
|  | 	d.client = client | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &base.Base{ | ||||||
|  | 		StorageDriver: d, | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Implement the storagedriver.StorageDriver interface
 | ||||||
|  | 
 | ||||||
|  | func (d *driver) Name() string { | ||||||
|  | 	return driverName | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // GetContent retrieves the content stored at "path" as a []byte.
 | ||||||
|  | // This should primarily be used for small objects.
 | ||||||
|  | func (d *driver) GetContent(context ctx.Context, path string) ([]byte, error) { | ||||||
|  | 	rc, err := d.ReadStream(context, path, 0) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	defer rc.Close() | ||||||
|  | 
 | ||||||
|  | 	p, err := ioutil.ReadAll(rc) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return p, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // PutContent stores the []byte content at a location designated by "path".
 | ||||||
|  | // This should primarily be used for small objects.
 | ||||||
|  | func (d *driver) PutContent(context ctx.Context, path string, contents []byte) error { | ||||||
|  | 	wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path)) | ||||||
|  | 	wc.ContentType = "application/octet-stream" | ||||||
|  | 	defer wc.Close() | ||||||
|  | 	_, err := wc.Write(contents) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ReadStream retrieves an io.ReadCloser for the content stored at "path"
 | ||||||
|  | // with a given byte offset.
 | ||||||
|  | // May be used to resume reading a stream by providing a nonzero offset.
 | ||||||
|  | func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io.ReadCloser, error) { | ||||||
|  | 	name := d.pathToKey(path) | ||||||
|  | 
 | ||||||
|  | 	// copied from google.golang.org/cloud/storage#NewReader :
 | ||||||
|  | 	// to set the additional "Range" header
 | ||||||
|  | 	u := &url.URL{ | ||||||
|  | 		Scheme: "https", | ||||||
|  | 		Host:   "storage.googleapis.com", | ||||||
|  | 		Path:   fmt.Sprintf("/%s/%s", d.bucket, name), | ||||||
|  | 	} | ||||||
|  | 	req, err := http.NewRequest("GET", u.String(), nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if offset > 0 { | ||||||
|  | 		req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset)) | ||||||
|  | 	} | ||||||
|  | 	res, err := d.client.Do(req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if res.StatusCode == http.StatusNotFound { | ||||||
|  | 		res.Body.Close() | ||||||
|  | 		return nil, storagedriver.PathNotFoundError{Path: path} | ||||||
|  | 	} | ||||||
|  | 	if res.StatusCode == http.StatusRequestedRangeNotSatisfiable { | ||||||
|  | 		res.Body.Close() | ||||||
|  | 		obj, err := storage.StatObject(d.context(context), d.bucket, name) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		if offset == int64(obj.Size) { | ||||||
|  | 			return ioutil.NopCloser(bytes.NewReader([]byte{})), nil | ||||||
|  | 		} | ||||||
|  | 		return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} | ||||||
|  | 	} | ||||||
|  | 	if res.StatusCode < 200 || res.StatusCode > 299 { | ||||||
|  | 		res.Body.Close() | ||||||
|  | 		return nil, fmt.Errorf("storage: can't read object %v/%v, status code: %v", d.bucket, name, res.Status) | ||||||
|  | 	} | ||||||
|  | 	return res.Body, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // WriteStream stores the contents of the provided io.ReadCloser at a
 | ||||||
|  | // location designated by the given path.
 | ||||||
|  | // May be used to resume writing a stream by providing a nonzero offset.
 | ||||||
|  | // The offset must be no larger than the CurrentSize for this path.
 | ||||||
|  | func (d *driver) WriteStream(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { | ||||||
|  | 	if offset < 0 { | ||||||
|  | 		return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if offset == 0 { | ||||||
|  | 		return d.writeCompletely(context, path, 0, reader) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	service, err := storageapi.New(d.client) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
|  | 	objService := storageapi.NewObjectsService(service) | ||||||
|  | 	var obj *storageapi.Object | ||||||
|  | 	err = retry(5, func() error { | ||||||
|  | 		o, err := objService.Get(d.bucket, d.pathToKey(path)).Do() | ||||||
|  | 		obj = o | ||||||
|  | 		return err | ||||||
|  | 	}) | ||||||
|  | 	//	obj, err := retry(5, objService.Get(d.bucket, d.pathToKey(path)).Do)
 | ||||||
|  | 	if err != nil { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// cannot append more chunks, so redo from scratch
 | ||||||
|  | 	if obj.ComponentCount >= 1023 { | ||||||
|  | 		return d.writeCompletely(context, path, offset, reader) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// skip from reader
 | ||||||
|  | 	objSize := int64(obj.Size) | ||||||
|  | 	nn, err := skip(reader, objSize-offset) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nn, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Size <= offset
 | ||||||
|  | 	partName := fmt.Sprintf("%v#part-%d#", d.pathToKey(path), obj.ComponentCount) | ||||||
|  | 	gcsContext := d.context(context) | ||||||
|  | 	wc := storage.NewWriter(gcsContext, d.bucket, partName) | ||||||
|  | 	wc.ContentType = "application/octet-stream" | ||||||
|  | 
 | ||||||
|  | 	if objSize < offset { | ||||||
|  | 		err = writeZeros(wc, offset-objSize) | ||||||
|  | 		if err != nil { | ||||||
|  | 			wc.CloseWithError(err) | ||||||
|  | 			return nn, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	n, err := io.Copy(wc, reader) | ||||||
|  | 	if err != nil { | ||||||
|  | 		wc.CloseWithError(err) | ||||||
|  | 		return nn, err | ||||||
|  | 	} | ||||||
|  | 	err = wc.Close() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nn, err | ||||||
|  | 	} | ||||||
|  | 	// wc was closed succesfully, so the temporary part exists, schedule it for deletion at the end
 | ||||||
|  | 	// of the function
 | ||||||
|  | 	defer storage.DeleteObject(gcsContext, d.bucket, partName) | ||||||
|  | 
 | ||||||
|  | 	req := &storageapi.ComposeRequest{ | ||||||
|  | 		Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType}, | ||||||
|  | 		SourceObjects: []*storageapi.ComposeRequestSourceObjects{ | ||||||
|  | 			{ | ||||||
|  | 				Name:       obj.Name, | ||||||
|  | 				Generation: obj.Generation, | ||||||
|  | 			}, { | ||||||
|  | 				Name:       partName, | ||||||
|  | 				Generation: wc.Object().Generation, | ||||||
|  | 			}}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err = retry(5, func() error { _, err := objService.Compose(d.bucket, obj.Name, req).Do(); return err }) | ||||||
|  | 	if err == nil { | ||||||
|  | 		nn = nn + n | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nn, err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type request func() error | ||||||
|  | 
 | ||||||
|  | func retry(maxTries int, req request) error { | ||||||
|  | 	backoff := time.Second | ||||||
|  | 	var err error | ||||||
|  | 	for i := 0; i < maxTries; i++ { | ||||||
|  | 		err := req() | ||||||
|  | 		if err == nil { | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		status := err.(*googleapi.Error) | ||||||
|  | 		if status == nil || (status.Code != 429 && status.Code < http.StatusInternalServerError) { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond)) | ||||||
|  | 		if i <= 4 { | ||||||
|  | 			backoff = backoff * 2 | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *driver) writeCompletely(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { | ||||||
|  | 	wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path)) | ||||||
|  | 	wc.ContentType = "application/octet-stream" | ||||||
|  | 	defer wc.Close() | ||||||
|  | 
 | ||||||
|  | 	// Copy the first offset bytes of the existing contents
 | ||||||
|  | 	// (padded with zeros if needed) into the writer
 | ||||||
|  | 	if offset > 0 { | ||||||
|  | 		existing, err := d.ReadStream(context, path, 0) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return 0, err | ||||||
|  | 		} | ||||||
|  | 		defer existing.Close() | ||||||
|  | 		n, err := io.CopyN(wc, existing, offset) | ||||||
|  | 		if err == io.EOF { | ||||||
|  | 			err = writeZeros(wc, offset-n) | ||||||
|  | 		} | ||||||
|  | 		if err != nil { | ||||||
|  | 			return 0, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return io.Copy(wc, reader) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func skip(reader io.Reader, count int64) (int64, error) { | ||||||
|  | 	if count <= 0 { | ||||||
|  | 		return 0, nil | ||||||
|  | 	} | ||||||
|  | 	return io.CopyN(ioutil.Discard, reader, count) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func writeZeros(wc io.Writer, count int64) error { | ||||||
|  | 	buf := make([]byte, 32*1024) | ||||||
|  | 	for count > 0 { | ||||||
|  | 		size := cap(buf) | ||||||
|  | 		if int64(size) > count { | ||||||
|  | 			size = int(count) | ||||||
|  | 		} | ||||||
|  | 		n, err := wc.Write(buf[0:size]) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		count = count - int64(n) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Stat retrieves the FileInfo for the given path, including the current
 | ||||||
|  | // size in bytes and the creation time.
 | ||||||
|  | func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, error) { | ||||||
|  | 	var fi storagedriver.FileInfoFields | ||||||
|  | 	//try to get as file
 | ||||||
|  | 	gcsContext := d.context(context) | ||||||
|  | 	obj, err := storage.StatObject(gcsContext, d.bucket, d.pathToKey(path)) | ||||||
|  | 	if err == nil { | ||||||
|  | 		fi = storagedriver.FileInfoFields{ | ||||||
|  | 			Path:    path, | ||||||
|  | 			Size:    obj.Size, | ||||||
|  | 			ModTime: obj.Updated, | ||||||
|  | 			IsDir:   false, | ||||||
|  | 		} | ||||||
|  | 		return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil | ||||||
|  | 	} | ||||||
|  | 	//try to get as folder
 | ||||||
|  | 	dirpath := d.pathToDirKey(path) | ||||||
|  | 
 | ||||||
|  | 	var query *storage.Query | ||||||
|  | 	query = &storage.Query{} | ||||||
|  | 	query.Prefix = dirpath | ||||||
|  | 	query.MaxResults = 1 | ||||||
|  | 
 | ||||||
|  | 	objects, err := storage.ListObjects(gcsContext, d.bucket, query) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if len(objects.Results) < 1 { | ||||||
|  | 		return nil, storagedriver.PathNotFoundError{Path: path} | ||||||
|  | 	} | ||||||
|  | 	fi = storagedriver.FileInfoFields{ | ||||||
|  | 		Path:  path, | ||||||
|  | 		IsDir: true, | ||||||
|  | 	} | ||||||
|  | 	obj = objects.Results[0] | ||||||
|  | 	if obj.Name == dirpath { | ||||||
|  | 		fi.Size = obj.Size | ||||||
|  | 		fi.ModTime = obj.Updated | ||||||
|  | 	} | ||||||
|  | 	return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // List returns a list of the objects that are direct descendants of the
 | ||||||
|  | //given path.
 | ||||||
|  | func (d *driver) List(context ctx.Context, path string) ([]string, error) { | ||||||
|  | 	var query *storage.Query | ||||||
|  | 	query = &storage.Query{} | ||||||
|  | 	query.Delimiter = "/" | ||||||
|  | 	query.Prefix = d.pathToDirKey(path) | ||||||
|  | 	list := make([]string, 0, 64) | ||||||
|  | 	for { | ||||||
|  | 		objects, err := storage.ListObjects(d.context(context), d.bucket, query) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		for _, object := range objects.Results { | ||||||
|  | 			// GCS does not guarantee strong consistency between
 | ||||||
|  | 			// DELETE and LIST operationsCheck that the object is not deleted,
 | ||||||
|  | 			// so filter out any objects with a non-zero time-deleted
 | ||||||
|  | 			if object.Deleted.IsZero() { | ||||||
|  | 				name := object.Name | ||||||
|  | 				// Ignore objects with names that end with '#' (these are uploaded parts)
 | ||||||
|  | 				if name[len(name)-1] != '#' { | ||||||
|  | 					name = d.keyToPath(name) | ||||||
|  | 					list = append(list, name) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		for _, subpath := range objects.Prefixes { | ||||||
|  | 			subpath = d.keyToPath(subpath) | ||||||
|  | 			list = append(list, subpath) | ||||||
|  | 		} | ||||||
|  | 		query = objects.Next | ||||||
|  | 		if query == nil { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return list, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Move moves an object stored at sourcePath to destPath, removing the
 | ||||||
|  | // original object.
 | ||||||
|  | func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) error { | ||||||
|  | 	prefix := d.pathToDirKey(sourcePath) | ||||||
|  | 	gcsContext := d.context(context) | ||||||
|  | 	keys, err := d.listAll(gcsContext, prefix) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if len(keys) > 0 { | ||||||
|  | 		destPrefix := d.pathToDirKey(destPath) | ||||||
|  | 		copies := make([]string, 0, len(keys)) | ||||||
|  | 		sort.Strings(keys) | ||||||
|  | 		var err error | ||||||
|  | 		for _, key := range keys { | ||||||
|  | 			dest := destPrefix + key[len(prefix):] | ||||||
|  | 			_, err = storage.CopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil) | ||||||
|  | 			if err == nil { | ||||||
|  | 				copies = append(copies, dest) | ||||||
|  | 			} else { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		// if an error occurred, attempt to cleanup the copies made
 | ||||||
|  | 		if err != nil { | ||||||
|  | 			for i := len(copies) - 1; i >= 0; i-- { | ||||||
|  | 				_ = storage.DeleteObject(gcsContext, d.bucket, copies[i]) | ||||||
|  | 			} | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		// delete originals
 | ||||||
|  | 		for i := len(keys) - 1; i >= 0; i-- { | ||||||
|  | 			err2 := storage.DeleteObject(gcsContext, d.bucket, keys[i]) | ||||||
|  | 			if err2 != nil { | ||||||
|  | 				err = err2 | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	_, err = storage.CopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if status := err.(*googleapi.Error); status != nil { | ||||||
|  | 			if status.Code == http.StatusNotFound { | ||||||
|  | 				return storagedriver.PathNotFoundError{Path: sourcePath} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // listAll recursively lists all names of objects stored at "prefix" and its subpaths.
 | ||||||
|  | func (d *driver) listAll(context context.Context, prefix string) ([]string, error) { | ||||||
|  | 	list := make([]string, 0, 64) | ||||||
|  | 	query := &storage.Query{} | ||||||
|  | 	query.Prefix = prefix | ||||||
|  | 	query.Versions = false | ||||||
|  | 	for { | ||||||
|  | 		objects, err := storage.ListObjects(d.context(context), d.bucket, query) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		for _, obj := range objects.Results { | ||||||
|  | 			// GCS does not guarantee strong consistency between
 | ||||||
|  | 			// DELETE and LIST operationsCheck that the object is not deleted,
 | ||||||
|  | 			// so filter out any objects with a non-zero time-deleted
 | ||||||
|  | 			if obj.Deleted.IsZero() { | ||||||
|  | 				list = append(list, obj.Name) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		query = objects.Next | ||||||
|  | 		if query == nil { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return list, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Delete recursively deletes all objects stored at "path" and its subpaths.
 | ||||||
|  | func (d *driver) Delete(context ctx.Context, path string) error { | ||||||
|  | 	prefix := d.pathToDirKey(path) | ||||||
|  | 	gcsContext := d.context(context) | ||||||
|  | 	keys, err := d.listAll(gcsContext, prefix) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if len(keys) > 0 { | ||||||
|  | 		sort.Sort(sort.Reverse(sort.StringSlice(keys))) | ||||||
|  | 		for _, key := range keys { | ||||||
|  | 			if err := storage.DeleteObject(gcsContext, d.bucket, key); err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	err = storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(path)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if status := err.(*googleapi.Error); status != nil { | ||||||
|  | 			if status.Code == http.StatusNotFound { | ||||||
|  | 				return storagedriver.PathNotFoundError{Path: path} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // URLFor returns a URL which may be used to retrieve the content stored at
 | ||||||
|  | // the given path, possibly using the given options.
 | ||||||
|  | // Returns ErrUnsupportedMethod if this driver has no privateKey
 | ||||||
|  | func (d *driver) URLFor(context ctx.Context, path string, options map[string]interface{}) (string, error) { | ||||||
|  | 	if d.privateKey == nil { | ||||||
|  | 		return "", storagedriver.ErrUnsupportedMethod | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	name := d.pathToKey(path) | ||||||
|  | 	methodString := "GET" | ||||||
|  | 	method, ok := options["method"] | ||||||
|  | 	if ok { | ||||||
|  | 		methodString, ok = method.(string) | ||||||
|  | 		if !ok || (methodString != "GET" && methodString != "HEAD") { | ||||||
|  | 			return "", storagedriver.ErrUnsupportedMethod | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	expiresTime := time.Now().Add(20 * time.Minute) | ||||||
|  | 	expires, ok := options["expiry"] | ||||||
|  | 	if ok { | ||||||
|  | 		et, ok := expires.(time.Time) | ||||||
|  | 		if ok { | ||||||
|  | 			expiresTime = et | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	opts := &storage.SignedURLOptions{ | ||||||
|  | 		GoogleAccessID: d.email, | ||||||
|  | 		PrivateKey:     d.privateKey, | ||||||
|  | 		Method:         methodString, | ||||||
|  | 		Expires:        expiresTime, | ||||||
|  | 	} | ||||||
|  | 	return storage.SignedURL(d.bucket, name, opts) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *driver) context(context ctx.Context) context.Context { | ||||||
|  | 	return cloud.WithContext(context, dummyProjectID, d.client) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *driver) pathToKey(path string) string { | ||||||
|  | 	return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *driver) pathToDirKey(path string) string { | ||||||
|  | 	return d.pathToKey(path) + "/" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *driver) keyToPath(key string) string { | ||||||
|  | 	return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/") | ||||||
|  | } | ||||||
|  | @ -0,0 +1,106 @@ | ||||||
|  | // +build include_gcs
 | ||||||
|  | 
 | ||||||
|  | package gcs | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"io/ioutil" | ||||||
|  | 	"os" | ||||||
|  | 	"testing" | ||||||
|  | 
 | ||||||
|  | 	ctx "github.com/docker/distribution/context" | ||||||
|  | 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||||
|  | 	"github.com/docker/distribution/registry/storage/driver/testsuites" | ||||||
|  | 
 | ||||||
|  | 	"gopkg.in/check.v1" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // Hook up gocheck into the "go test" runner.
 | ||||||
|  | func Test(t *testing.T) { check.TestingT(t) } | ||||||
|  | 
 | ||||||
|  | var gcsDriverConstructor func(rootDirectory string) (storagedriver.StorageDriver, error) | ||||||
|  | var skipGCS func() string | ||||||
|  | 
 | ||||||
|  | func init() { | ||||||
|  | 	bucket := os.Getenv("REGISTRY_STORAGE_GCS_BUCKET") | ||||||
|  | 	keyfile := os.Getenv("REGISTRY_STORAGE_GCS_KEYFILE") | ||||||
|  | 	credentials := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") | ||||||
|  | 
 | ||||||
|  | 	root, err := ioutil.TempDir("", "driver-") | ||||||
|  | 	if err != nil { | ||||||
|  | 		panic(err) | ||||||
|  | 	} | ||||||
|  | 	defer os.Remove(root) | ||||||
|  | 
 | ||||||
|  | 	gcsDriverConstructor = func(rootDirectory string) (storagedriver.StorageDriver, error) { | ||||||
|  | 
 | ||||||
|  | 		parameters := driverParameters{ | ||||||
|  | 			bucket, | ||||||
|  | 			keyfile, | ||||||
|  | 			rootDirectory, | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return New(parameters) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Skip GCS storage driver tests if environment variable parameters are not provided
 | ||||||
|  | 	skipGCS = func() string { | ||||||
|  | 		if bucket == "" || (credentials == "" && keyfile == "") { | ||||||
|  | 			return "Must set REGISTRY_STORAGE_GCS_BUCKET and (GOOGLE_APPLICATION_CREDENTIALS or REGISTRY_STORAGE_GCS_KEYFILE) to run GCS tests" | ||||||
|  | 		} | ||||||
|  | 		return "" | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { | ||||||
|  | 		return gcsDriverConstructor(root) | ||||||
|  | 	}, skipGCS) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestEmptyRootList(t *testing.T) { | ||||||
|  | 	if skipGCS() != "" { | ||||||
|  | 		t.Skip(skipGCS()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	validRoot, err := ioutil.TempDir("", "driver-") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("unexpected error creating temporary directory: %v", err) | ||||||
|  | 	} | ||||||
|  | 	defer os.Remove(validRoot) | ||||||
|  | 
 | ||||||
|  | 	rootedDriver, err := gcsDriverConstructor(validRoot) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("unexpected error creating rooted driver: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	emptyRootDriver, err := gcsDriverConstructor("") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("unexpected error creating empty root driver: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	slashRootDriver, err := gcsDriverConstructor("/") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("unexpected error creating slash root driver: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	filename := "/test" | ||||||
|  | 	contents := []byte("contents") | ||||||
|  | 	ctx := ctx.Background() | ||||||
|  | 	err = rootedDriver.PutContent(ctx, filename, contents) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("unexpected error creating content: %v", err) | ||||||
|  | 	} | ||||||
|  | 	defer rootedDriver.Delete(ctx, filename) | ||||||
|  | 
 | ||||||
|  | 	keys, err := emptyRootDriver.List(ctx, "/") | ||||||
|  | 	for _, path := range keys { | ||||||
|  | 		if !storagedriver.PathRegexp.MatchString(path) { | ||||||
|  | 			t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	keys, err = slashRootDriver.List(ctx, "/") | ||||||
|  | 	for _, path := range keys { | ||||||
|  | 		if !storagedriver.PathRegexp.MatchString(path) { | ||||||
|  | 			t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
		Loading…
	
		Reference in New Issue