933 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			933 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
| // Package gcs provides a storagedriver.StorageDriver implementation to
 | |
| // store blobs in Google cloud storage.
 | |
| //
 | |
| // This package leverages the google.golang.org/cloud/storage client library
 | |
| //for interfacing with gcs.
 | |
| //
 | |
| // Because gcs is a key, value store the Stat call does not support last modification
 | |
| // time for directories (directories are an abstraction for key, value stores)
 | |
| //
 | |
| // Note that the contents of incomplete uploads are not accessible even though
 | |
| // Stat returns their length
 | |
| //
 | |
| //go:build include_gcs
 | |
| // +build include_gcs
 | |
| 
 | |
| package gcs
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"io/ioutil"
 | |
| 	"math/rand"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"reflect"
 | |
| 	"regexp"
 | |
| 	"sort"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
 | |
| 	"github.com/distribution/distribution/v3/registry/storage/driver/base"
 | |
| 	"github.com/distribution/distribution/v3/registry/storage/driver/factory"
 | |
| 	"github.com/sirupsen/logrus"
 | |
| 	"golang.org/x/oauth2"
 | |
| 	"golang.org/x/oauth2/google"
 | |
| 	"golang.org/x/oauth2/jwt"
 | |
| 	"google.golang.org/api/googleapi"
 | |
| 	"google.golang.org/cloud"
 | |
| 	"google.golang.org/cloud/storage"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	driverName     = "gcs"
 | |
| 	dummyProjectID = "<unknown>"
 | |
| 
 | |
| 	uploadSessionContentType = "application/x-docker-upload-session"
 | |
| 	minChunkSize             = 256 * 1024
 | |
| 	defaultChunkSize         = 20 * minChunkSize
 | |
| 	defaultMaxConcurrency    = 50
 | |
| 	minConcurrency           = 25
 | |
| 
 | |
| 	maxTries = 5
 | |
| )
 | |
| 
 | |
| var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`)
 | |
| 
 | |
| // driverParameters is a struct that encapsulates all of the driver parameters after all values have been set
 | |
| type driverParameters struct {
 | |
| 	bucket        string
 | |
| 	config        *jwt.Config
 | |
| 	email         string
 | |
| 	privateKey    []byte
 | |
| 	client        *http.Client
 | |
| 	rootDirectory string
 | |
| 	chunkSize     int
 | |
| 
 | |
| 	// maxConcurrency limits the number of concurrent driver operations
 | |
| 	// to GCS, which ultimately increases reliability of many simultaneous
 | |
| 	// pushes by ensuring we aren't DoSing our own server with many
 | |
| 	// connections.
 | |
| 	maxConcurrency uint64
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	factory.Register(driverName, &gcsDriverFactory{})
 | |
| }
 | |
| 
 | |
| // gcsDriverFactory implements the factory.StorageDriverFactory interface
 | |
| type gcsDriverFactory struct{}
 | |
| 
 | |
| // Create StorageDriver from parameters
 | |
| func (factory *gcsDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
 | |
| 	return FromParameters(parameters)
 | |
| }
 | |
| 
 | |
| // driver is a storagedriver.StorageDriver implementation backed by GCS
 | |
| // Objects are stored at absolute keys in the provided bucket.
 | |
| type driver struct {
 | |
| 	client        *http.Client
 | |
| 	bucket        string
 | |
| 	email         string
 | |
| 	privateKey    []byte
 | |
| 	rootDirectory string
 | |
| 	chunkSize     int
 | |
| }
 | |
| 
 | |
| // Wrapper wraps `driver` with a throttler, ensuring that no more than N
 | |
| // GCS actions can occur concurrently. The default limit is 75.
 | |
| type Wrapper struct {
 | |
| 	baseEmbed
 | |
| }
 | |
| 
 | |
| type baseEmbed struct {
 | |
| 	base.Base
 | |
| }
 | |
| 
 | |
| // FromParameters constructs a new Driver with a given parameters map
 | |
| // Required parameters:
 | |
| // - bucket
 | |
| func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
 | |
| 	bucket, ok := parameters["bucket"]
 | |
| 	if !ok || fmt.Sprint(bucket) == "" {
 | |
| 		return nil, fmt.Errorf("No bucket parameter provided")
 | |
| 	}
 | |
| 
 | |
| 	rootDirectory, ok := parameters["rootdirectory"]
 | |
| 	if !ok {
 | |
| 		rootDirectory = ""
 | |
| 	}
 | |
| 
 | |
| 	chunkSize := defaultChunkSize
 | |
| 	chunkSizeParam, ok := parameters["chunksize"]
 | |
| 	if ok {
 | |
| 		switch v := chunkSizeParam.(type) {
 | |
| 		case string:
 | |
| 			vv, err := strconv.Atoi(v)
 | |
| 			if err != nil {
 | |
| 				return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
 | |
| 			}
 | |
| 			chunkSize = vv
 | |
| 		case int, uint, int32, uint32, uint64, int64:
 | |
| 			chunkSize = int(reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int())
 | |
| 		default:
 | |
| 			return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
 | |
| 		}
 | |
| 
 | |
| 		if chunkSize < minChunkSize {
 | |
| 			return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
 | |
| 		}
 | |
| 
 | |
| 		if chunkSize%minChunkSize != 0 {
 | |
| 			return nil, fmt.Errorf("chunksize should be a multiple of %d", minChunkSize)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var ts oauth2.TokenSource
 | |
| 	jwtConf := new(jwt.Config)
 | |
| 	if keyfile, ok := parameters["keyfile"]; ok {
 | |
| 		jsonKey, err := ioutil.ReadFile(fmt.Sprint(keyfile))
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		jwtConf, err = google.JWTConfigFromJSON(jsonKey, storage.ScopeFullControl)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		ts = jwtConf.TokenSource(context.Background())
 | |
| 	} else if credentials, ok := parameters["credentials"]; ok {
 | |
| 		credentialMap, ok := credentials.(map[interface{}]interface{})
 | |
| 		if !ok {
 | |
| 			return nil, fmt.Errorf("The credentials were not specified in the correct format")
 | |
| 		}
 | |
| 
 | |
| 		stringMap := map[string]interface{}{}
 | |
| 		for k, v := range credentialMap {
 | |
| 			key, ok := k.(string)
 | |
| 			if !ok {
 | |
| 				return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k))
 | |
| 			}
 | |
| 			stringMap[key] = v
 | |
| 		}
 | |
| 
 | |
| 		data, err := json.Marshal(stringMap)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("Failed to marshal gcs credentials to json")
 | |
| 		}
 | |
| 
 | |
| 		jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		ts = jwtConf.TokenSource(context.Background())
 | |
| 	} else {
 | |
| 		var err error
 | |
| 		ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("maxconcurrency config error: %s", err)
 | |
| 	}
 | |
| 
 | |
| 	params := driverParameters{
 | |
| 		bucket:         fmt.Sprint(bucket),
 | |
| 		rootDirectory:  fmt.Sprint(rootDirectory),
 | |
| 		email:          jwtConf.Email,
 | |
| 		privateKey:     jwtConf.PrivateKey,
 | |
| 		client:         oauth2.NewClient(context.Background(), ts),
 | |
| 		chunkSize:      chunkSize,
 | |
| 		maxConcurrency: maxConcurrency,
 | |
| 	}
 | |
| 
 | |
| 	return New(params)
 | |
| }
 | |
| 
 | |
| // New constructs a new driver
 | |
| func New(params driverParameters) (storagedriver.StorageDriver, error) {
 | |
| 	rootDirectory := strings.Trim(params.rootDirectory, "/")
 | |
| 	if rootDirectory != "" {
 | |
| 		rootDirectory += "/"
 | |
| 	}
 | |
| 	if params.chunkSize <= 0 || params.chunkSize%minChunkSize != 0 {
 | |
| 		return nil, fmt.Errorf("Invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize)
 | |
| 	}
 | |
| 	d := &driver{
 | |
| 		bucket:        params.bucket,
 | |
| 		rootDirectory: rootDirectory,
 | |
| 		email:         params.email,
 | |
| 		privateKey:    params.privateKey,
 | |
| 		client:        params.client,
 | |
| 		chunkSize:     params.chunkSize,
 | |
| 	}
 | |
| 
 | |
| 	return &Wrapper{
 | |
| 		baseEmbed: baseEmbed{
 | |
| 			Base: base.Base{
 | |
| 				StorageDriver: base.NewRegulator(d, params.maxConcurrency),
 | |
| 			},
 | |
| 		},
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // Implement the storagedriver.StorageDriver interface
 | |
| 
 | |
| func (d *driver) Name() string {
 | |
| 	return driverName
 | |
| }
 | |
| 
 | |
| // GetContent retrieves the content stored at "path" as a []byte.
 | |
| // This should primarily be used for small objects.
 | |
| func (d *driver) GetContent(context context.Context, path string) ([]byte, error) {
 | |
| 	gcsContext := d.context(context)
 | |
| 	name := d.pathToKey(path)
 | |
| 	var rc io.ReadCloser
 | |
| 	err := retry(func() error {
 | |
| 		var err error
 | |
| 		rc, err = storage.NewReader(gcsContext, d.bucket, name)
 | |
| 		return err
 | |
| 	})
 | |
| 	if err == storage.ErrObjectNotExist {
 | |
| 		return nil, storagedriver.PathNotFoundError{Path: path}
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer rc.Close()
 | |
| 
 | |
| 	p, err := ioutil.ReadAll(rc)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return p, nil
 | |
| }
 | |
| 
 | |
| // PutContent stores the []byte content at a location designated by "path".
 | |
| // This should primarily be used for small objects.
 | |
| func (d *driver) PutContent(context context.Context, path string, contents []byte) error {
 | |
| 	return retry(func() error {
 | |
| 		wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
 | |
| 		wc.ContentType = "application/octet-stream"
 | |
| 		return putContentsClose(wc, contents)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Reader retrieves an io.ReadCloser for the content stored at "path"
 | |
| // with a given byte offset.
 | |
| // May be used to resume reading a stream by providing a nonzero offset.
 | |
| func (d *driver) Reader(context context.Context, path string, offset int64) (io.ReadCloser, error) {
 | |
| 	res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset)
 | |
| 	if err != nil {
 | |
| 		if res != nil {
 | |
| 			if res.StatusCode == http.StatusNotFound {
 | |
| 				res.Body.Close()
 | |
| 				return nil, storagedriver.PathNotFoundError{Path: path}
 | |
| 			}
 | |
| 
 | |
| 			if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
 | |
| 				res.Body.Close()
 | |
| 				obj, err := storageStatObject(d.context(context), d.bucket, d.pathToKey(path))
 | |
| 				if err != nil {
 | |
| 					return nil, err
 | |
| 				}
 | |
| 				if offset == int64(obj.Size) {
 | |
| 					return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
 | |
| 				}
 | |
| 				return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
 | |
| 			}
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if res.Header.Get("Content-Type") == uploadSessionContentType {
 | |
| 		defer res.Body.Close()
 | |
| 		return nil, storagedriver.PathNotFoundError{Path: path}
 | |
| 	}
 | |
| 	return res.Body, nil
 | |
| }
 | |
| 
 | |
| func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) {
 | |
| 	// copied from google.golang.org/cloud/storage#NewReader :
 | |
| 	// to set the additional "Range" header
 | |
| 	u := &url.URL{
 | |
| 		Scheme: "https",
 | |
| 		Host:   "storage.googleapis.com",
 | |
| 		Path:   fmt.Sprintf("/%s/%s", bucket, name),
 | |
| 	}
 | |
| 	req, err := http.NewRequest(http.MethodGet, u.String(), nil)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if offset > 0 {
 | |
| 		req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
 | |
| 	}
 | |
| 	var res *http.Response
 | |
| 	err = retry(func() error {
 | |
| 		var err error
 | |
| 		res, err = client.Do(req)
 | |
| 		return err
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return res, googleapi.CheckMediaResponse(res)
 | |
| }
 | |
| 
 | |
| // Writer returns a FileWriter which will store the content written to it
 | |
| // at the location designated by "path" after the call to Commit.
 | |
| func (d *driver) Writer(context context.Context, path string, append bool) (storagedriver.FileWriter, error) {
 | |
| 	writer := &writer{
 | |
| 		client: d.client,
 | |
| 		bucket: d.bucket,
 | |
| 		name:   d.pathToKey(path),
 | |
| 		buffer: make([]byte, d.chunkSize),
 | |
| 	}
 | |
| 
 | |
| 	if append {
 | |
| 		err := writer.init(path)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return writer, nil
 | |
| }
 | |
| 
 | |
| type writer struct {
 | |
| 	client     *http.Client
 | |
| 	bucket     string
 | |
| 	name       string
 | |
| 	size       int64
 | |
| 	offset     int64
 | |
| 	closed     bool
 | |
| 	sessionURI string
 | |
| 	buffer     []byte
 | |
| 	buffSize   int
 | |
| }
 | |
| 
 | |
| // Cancel removes any written content from this FileWriter.
 | |
| func (w *writer) Cancel() error {
 | |
| 	w.closed = true
 | |
| 	err := storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
 | |
| 	if err != nil {
 | |
| 		if status, ok := err.(*googleapi.Error); ok {
 | |
| 			if status.Code == http.StatusNotFound {
 | |
| 				err = nil
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (w *writer) Close() error {
 | |
| 	if w.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 	w.closed = true
 | |
| 
 | |
| 	err := w.writeChunk()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Copy the remaining bytes from the buffer to the upload session
 | |
| 	// Normally buffSize will be smaller than minChunkSize. However, in the
 | |
| 	// unlikely event that the upload session failed to start, this number could be higher.
 | |
| 	// In this case we can safely clip the remaining bytes to the minChunkSize
 | |
| 	if w.buffSize > minChunkSize {
 | |
| 		w.buffSize = minChunkSize
 | |
| 	}
 | |
| 
 | |
| 	// commit the writes by updating the upload session
 | |
| 	err = retry(func() error {
 | |
| 		wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
 | |
| 		wc.ContentType = uploadSessionContentType
 | |
| 		wc.Metadata = map[string]string{
 | |
| 			"Session-URI": w.sessionURI,
 | |
| 			"Offset":      strconv.FormatInt(w.offset, 10),
 | |
| 		}
 | |
| 		return putContentsClose(wc, w.buffer[0:w.buffSize])
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	w.size = w.offset + int64(w.buffSize)
 | |
| 	w.buffSize = 0
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func putContentsClose(wc *storage.Writer, contents []byte) error {
 | |
| 	size := len(contents)
 | |
| 	var nn int
 | |
| 	var err error
 | |
| 	for nn < size {
 | |
| 		var n int
 | |
| 		n, err = wc.Write(contents[nn:size])
 | |
| 		nn += n
 | |
| 		if err != nil {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		wc.CloseWithError(err)
 | |
| 		return err
 | |
| 	}
 | |
| 	return wc.Close()
 | |
| }
 | |
| 
 | |
| // Commit flushes all content written to this FileWriter and makes it
 | |
| // available for future calls to StorageDriver.GetContent and
 | |
| // StorageDriver.Reader.
 | |
| func (w *writer) Commit() error {
 | |
| 
 | |
| 	if err := w.checkClosed(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	w.closed = true
 | |
| 
 | |
| 	// no session started yet just perform a simple upload
 | |
| 	if w.sessionURI == "" {
 | |
| 		err := retry(func() error {
 | |
| 			wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
 | |
| 			wc.ContentType = "application/octet-stream"
 | |
| 			return putContentsClose(wc, w.buffer[0:w.buffSize])
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		w.size = w.offset + int64(w.buffSize)
 | |
| 		w.buffSize = 0
 | |
| 		return nil
 | |
| 	}
 | |
| 	size := w.offset + int64(w.buffSize)
 | |
| 	var nn int
 | |
| 	// loop must be performed at least once to ensure the file is committed even when
 | |
| 	// the buffer is empty
 | |
| 	for {
 | |
| 		n, err := putChunk(w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, size)
 | |
| 		nn += int(n)
 | |
| 		w.offset += n
 | |
| 		w.size = w.offset
 | |
| 		if err != nil {
 | |
| 			w.buffSize = copy(w.buffer, w.buffer[nn:w.buffSize])
 | |
| 			return err
 | |
| 		}
 | |
| 		if nn == w.buffSize {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	w.buffSize = 0
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (w *writer) checkClosed() error {
 | |
| 	if w.closed {
 | |
| 		return fmt.Errorf("Writer already closed")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (w *writer) writeChunk() error {
 | |
| 	var err error
 | |
| 	// chunks can be uploaded only in multiples of minChunkSize
 | |
| 	// chunkSize is a multiple of minChunkSize less than or equal to buffSize
 | |
| 	chunkSize := w.buffSize - (w.buffSize % minChunkSize)
 | |
| 	if chunkSize == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	// if their is no sessionURI yet, obtain one by starting the session
 | |
| 	if w.sessionURI == "" {
 | |
| 		w.sessionURI, err = startSession(w.client, w.bucket, w.name)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	nn, err := putChunk(w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1)
 | |
| 	w.offset += nn
 | |
| 	if w.offset > w.size {
 | |
| 		w.size = w.offset
 | |
| 	}
 | |
| 	// shift the remaining bytes to the start of the buffer
 | |
| 	w.buffSize = copy(w.buffer, w.buffer[int(nn):w.buffSize])
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (w *writer) Write(p []byte) (int, error) {
 | |
| 	err := w.checkClosed()
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	var nn int
 | |
| 	for nn < len(p) {
 | |
| 		n := copy(w.buffer[w.buffSize:], p[nn:])
 | |
| 		w.buffSize += n
 | |
| 		if w.buffSize == cap(w.buffer) {
 | |
| 			err = w.writeChunk()
 | |
| 			if err != nil {
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		nn += n
 | |
| 	}
 | |
| 	return nn, err
 | |
| }
 | |
| 
 | |
| // Size returns the number of bytes written to this FileWriter.
 | |
| func (w *writer) Size() int64 {
 | |
| 	return w.size
 | |
| }
 | |
| 
 | |
| func (w *writer) init(path string) error {
 | |
| 	res, err := getObject(w.client, w.bucket, w.name, 0)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer res.Body.Close()
 | |
| 	if res.Header.Get("Content-Type") != uploadSessionContentType {
 | |
| 		return storagedriver.PathNotFoundError{Path: path}
 | |
| 	}
 | |
| 	offset, err := strconv.ParseInt(res.Header.Get("X-Goog-Meta-Offset"), 10, 64)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	buffer, err := ioutil.ReadAll(res.Body)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	w.sessionURI = res.Header.Get("X-Goog-Meta-Session-URI")
 | |
| 	w.buffSize = copy(w.buffer, buffer)
 | |
| 	w.offset = offset
 | |
| 	w.size = offset + int64(w.buffSize)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type request func() error
 | |
| 
 | |
| func retry(req request) error {
 | |
| 	backoff := time.Second
 | |
| 	var err error
 | |
| 	for i := 0; i < maxTries; i++ {
 | |
| 		err = req()
 | |
| 		if err == nil {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		status, ok := err.(*googleapi.Error)
 | |
| 		if !ok || (status.Code != 429 && status.Code < http.StatusInternalServerError) {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond))
 | |
| 		if i <= 4 {
 | |
| 			backoff = backoff * 2
 | |
| 		}
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Stat retrieves the FileInfo for the given path, including the current
 | |
| // size in bytes and the creation time.
 | |
| func (d *driver) Stat(context context.Context, path string) (storagedriver.FileInfo, error) {
 | |
| 	var fi storagedriver.FileInfoFields
 | |
| 	//try to get as file
 | |
| 	gcsContext := d.context(context)
 | |
| 	obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path))
 | |
| 	if err == nil {
 | |
| 		if obj.ContentType == uploadSessionContentType {
 | |
| 			return nil, storagedriver.PathNotFoundError{Path: path}
 | |
| 		}
 | |
| 		fi = storagedriver.FileInfoFields{
 | |
| 			Path:    path,
 | |
| 			Size:    obj.Size,
 | |
| 			ModTime: obj.Updated,
 | |
| 			IsDir:   false,
 | |
| 		}
 | |
| 		return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
 | |
| 	}
 | |
| 	//try to get as folder
 | |
| 	dirpath := d.pathToDirKey(path)
 | |
| 
 | |
| 	var query *storage.Query
 | |
| 	query = &storage.Query{}
 | |
| 	query.Prefix = dirpath
 | |
| 	query.MaxResults = 1
 | |
| 
 | |
| 	objects, err := storageListObjects(gcsContext, d.bucket, query)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if len(objects.Results) < 1 {
 | |
| 		return nil, storagedriver.PathNotFoundError{Path: path}
 | |
| 	}
 | |
| 	fi = storagedriver.FileInfoFields{
 | |
| 		Path:  path,
 | |
| 		IsDir: true,
 | |
| 	}
 | |
| 	obj = objects.Results[0]
 | |
| 	if obj.Name == dirpath {
 | |
| 		fi.Size = obj.Size
 | |
| 		fi.ModTime = obj.Updated
 | |
| 	}
 | |
| 	return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
 | |
| }
 | |
| 
 | |
| // List returns a list of the objects that are direct descendants of the
 | |
| //given path.
 | |
| func (d *driver) List(context context.Context, path string) ([]string, error) {
 | |
| 	var query *storage.Query
 | |
| 	query = &storage.Query{}
 | |
| 	query.Delimiter = "/"
 | |
| 	query.Prefix = d.pathToDirKey(path)
 | |
| 	list := make([]string, 0, 64)
 | |
| 	for {
 | |
| 		objects, err := storageListObjects(d.context(context), d.bucket, query)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		for _, object := range objects.Results {
 | |
| 			// GCS does not guarantee strong consistency between
 | |
| 			// DELETE and LIST operations. Check that the object is not deleted,
 | |
| 			// and filter out any objects with a non-zero time-deleted
 | |
| 			if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType {
 | |
| 				list = append(list, d.keyToPath(object.Name))
 | |
| 			}
 | |
| 		}
 | |
| 		for _, subpath := range objects.Prefixes {
 | |
| 			subpath = d.keyToPath(subpath)
 | |
| 			list = append(list, subpath)
 | |
| 		}
 | |
| 		query = objects.Next
 | |
| 		if query == nil {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	if path != "/" && len(list) == 0 {
 | |
| 		// Treat empty response as missing directory, since we don't actually
 | |
| 		// have directories in Google Cloud Storage.
 | |
| 		return nil, storagedriver.PathNotFoundError{Path: path}
 | |
| 	}
 | |
| 	return list, nil
 | |
| }
 | |
| 
 | |
| // Move moves an object stored at sourcePath to destPath, removing the
 | |
| // original object.
 | |
| func (d *driver) Move(context context.Context, sourcePath string, destPath string) error {
 | |
| 	gcsContext := d.context(context)
 | |
| 	_, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
 | |
| 	if err != nil {
 | |
| 		if status, ok := err.(*googleapi.Error); ok {
 | |
| 			if status.Code == http.StatusNotFound {
 | |
| 				return storagedriver.PathNotFoundError{Path: sourcePath}
 | |
| 			}
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 	err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
 | |
| 	// if deleting the file fails, log the error, but do not fail; the file was successfully copied,
 | |
| 	// and the original should eventually be cleaned when purging the uploads folder.
 | |
| 	if err != nil {
 | |
| 		logrus.Infof("error deleting file: %v due to %v", sourcePath, err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // listAll recursively lists all names of objects stored at "prefix" and its subpaths.
 | |
| func (d *driver) listAll(context context.Context, prefix string) ([]string, error) {
 | |
| 	list := make([]string, 0, 64)
 | |
| 	query := &storage.Query{}
 | |
| 	query.Prefix = prefix
 | |
| 	query.Versions = false
 | |
| 	for {
 | |
| 		objects, err := storageListObjects(d.context(context), d.bucket, query)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		for _, obj := range objects.Results {
 | |
| 			// GCS does not guarantee strong consistency between
 | |
| 			// DELETE and LIST operations. Check that the object is not deleted,
 | |
| 			// and filter out any objects with a non-zero time-deleted
 | |
| 			if obj.Deleted.IsZero() {
 | |
| 				list = append(list, obj.Name)
 | |
| 			}
 | |
| 		}
 | |
| 		query = objects.Next
 | |
| 		if query == nil {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	return list, nil
 | |
| }
 | |
| 
 | |
| // Delete recursively deletes all objects stored at "path" and its subpaths.
 | |
| func (d *driver) Delete(context context.Context, path string) error {
 | |
| 	prefix := d.pathToDirKey(path)
 | |
| 	gcsContext := d.context(context)
 | |
| 	keys, err := d.listAll(gcsContext, prefix)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if len(keys) > 0 {
 | |
| 		sort.Sort(sort.Reverse(sort.StringSlice(keys)))
 | |
| 		for _, key := range keys {
 | |
| 			err := storageDeleteObject(gcsContext, d.bucket, key)
 | |
| 			// GCS only guarantees eventual consistency, so listAll might return
 | |
| 			// paths that no longer exist. If this happens, just ignore any not
 | |
| 			// found error
 | |
| 			if status, ok := err.(*googleapi.Error); ok {
 | |
| 				if status.Code == http.StatusNotFound {
 | |
| 					err = nil
 | |
| 				}
 | |
| 			}
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 	err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path))
 | |
| 	if err != nil {
 | |
| 		if status, ok := err.(*googleapi.Error); ok {
 | |
| 			if status.Code == http.StatusNotFound {
 | |
| 				return storagedriver.PathNotFoundError{Path: path}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func storageDeleteObject(context context.Context, bucket string, name string) error {
 | |
| 	return retry(func() error {
 | |
| 		return storage.DeleteObject(context, bucket, name)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) {
 | |
| 	var obj *storage.Object
 | |
| 	err := retry(func() error {
 | |
| 		var err error
 | |
| 		obj, err = storage.StatObject(context, bucket, name)
 | |
| 		return err
 | |
| 	})
 | |
| 	return obj, err
 | |
| }
 | |
| 
 | |
| func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) {
 | |
| 	var objs *storage.Objects
 | |
| 	err := retry(func() error {
 | |
| 		var err error
 | |
| 		objs, err = storage.ListObjects(context, bucket, q)
 | |
| 		return err
 | |
| 	})
 | |
| 	return objs, err
 | |
| }
 | |
| 
 | |
| func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) {
 | |
| 	var obj *storage.Object
 | |
| 	err := retry(func() error {
 | |
| 		var err error
 | |
| 		obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs)
 | |
| 		return err
 | |
| 	})
 | |
| 	return obj, err
 | |
| }
 | |
| 
 | |
| // URLFor returns a URL which may be used to retrieve the content stored at
 | |
| // the given path, possibly using the given options.
 | |
| // Returns ErrUnsupportedMethod if this driver has no privateKey
 | |
| func (d *driver) URLFor(context context.Context, path string, options map[string]interface{}) (string, error) {
 | |
| 	if d.privateKey == nil {
 | |
| 		return "", storagedriver.ErrUnsupportedMethod{}
 | |
| 	}
 | |
| 
 | |
| 	name := d.pathToKey(path)
 | |
| 	methodString := http.MethodGet
 | |
| 	method, ok := options["method"]
 | |
| 	if ok {
 | |
| 		methodString, ok = method.(string)
 | |
| 		if !ok || (methodString != http.MethodGet && methodString != http.MethodHead) {
 | |
| 			return "", storagedriver.ErrUnsupportedMethod{}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	expiresTime := time.Now().Add(20 * time.Minute)
 | |
| 	expires, ok := options["expiry"]
 | |
| 	if ok {
 | |
| 		et, ok := expires.(time.Time)
 | |
| 		if ok {
 | |
| 			expiresTime = et
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	opts := &storage.SignedURLOptions{
 | |
| 		GoogleAccessID: d.email,
 | |
| 		PrivateKey:     d.privateKey,
 | |
| 		Method:         methodString,
 | |
| 		Expires:        expiresTime,
 | |
| 	}
 | |
| 	return storage.SignedURL(d.bucket, name, opts)
 | |
| }
 | |
| 
 | |
| // Walk traverses a filesystem defined within driver, starting
 | |
| // from the given path, calling f on each file
 | |
| func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
 | |
| 	return storagedriver.WalkFallback(ctx, d, path, f)
 | |
| }
 | |
| 
 | |
| func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
 | |
| 	u := &url.URL{
 | |
| 		Scheme:   "https",
 | |
| 		Host:     "www.googleapis.com",
 | |
| 		Path:     fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket),
 | |
| 		RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name),
 | |
| 	}
 | |
| 	err = retry(func() error {
 | |
| 		req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		req.Header.Set("X-Upload-Content-Type", "application/octet-stream")
 | |
| 		req.Header.Set("Content-Length", "0")
 | |
| 		resp, err := client.Do(req)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		defer resp.Body.Close()
 | |
| 		err = googleapi.CheckMediaResponse(resp)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		uri = resp.Header.Get("Location")
 | |
| 		return nil
 | |
| 	})
 | |
| 	return uri, err
 | |
| }
 | |
| 
 | |
| func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) {
 | |
| 	bytesPut := int64(0)
 | |
| 	err := retry(func() error {
 | |
| 		req, err := http.NewRequest(http.MethodPut, sessionURI, bytes.NewReader(chunk))
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		length := int64(len(chunk))
 | |
| 		to := from + length - 1
 | |
| 		size := "*"
 | |
| 		if totalSize >= 0 {
 | |
| 			size = strconv.FormatInt(totalSize, 10)
 | |
| 		}
 | |
| 		req.Header.Set("Content-Type", "application/octet-stream")
 | |
| 		if from == to+1 {
 | |
| 			req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size))
 | |
| 		} else {
 | |
| 			req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size))
 | |
| 		}
 | |
| 		req.Header.Set("Content-Length", strconv.FormatInt(length, 10))
 | |
| 
 | |
| 		resp, err := client.Do(req)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		defer resp.Body.Close()
 | |
| 		if totalSize < 0 && resp.StatusCode == 308 {
 | |
| 			groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range"))
 | |
| 			end, err := strconv.ParseInt(groups[2], 10, 64)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			bytesPut = end - from + 1
 | |
| 			return nil
 | |
| 		}
 | |
| 		err = googleapi.CheckMediaResponse(resp)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		bytesPut = to - from + 1
 | |
| 		return nil
 | |
| 	})
 | |
| 	return bytesPut, err
 | |
| }
 | |
| 
 | |
| func (d *driver) context(context context.Context) context.Context {
 | |
| 	return cloud.WithContext(context, dummyProjectID, d.client)
 | |
| }
 | |
| 
 | |
| func (d *driver) pathToKey(path string) string {
 | |
| 	return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
 | |
| }
 | |
| 
 | |
| func (d *driver) pathToDirKey(path string) string {
 | |
| 	return d.pathToKey(path) + "/"
 | |
| }
 | |
| 
 | |
| func (d *driver) keyToPath(key string) string {
 | |
| 	return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/")
 | |
| }
 |