938 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			938 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
// Package swift provides a storagedriver.StorageDriver implementation to
 | 
						|
// store blobs in Openstack Swift object storage.
 | 
						|
//
 | 
						|
// This package leverages the ncw/swift client library for interfacing with
 | 
						|
// Swift.
 | 
						|
//
 | 
						|
// It supports both TempAuth authentication and Keystone authentication
 | 
						|
// (up to version 3).
 | 
						|
//
 | 
						|
// As Swift has a limit on the size of a single uploaded object (by default
 | 
						|
// this is 5GB), the driver makes use of the Swift Large Object Support
 | 
						|
// (http://docs.openstack.org/developer/swift/overview_large_objects.html).
 | 
						|
// Only one container is used for both manifests and data objects. Manifests
 | 
						|
// are stored in the 'files' pseudo directory, data objects are stored under
 | 
						|
// 'segments'.
 | 
						|
package swift
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"bytes"
 | 
						|
	"context"
 | 
						|
	"crypto/rand"
 | 
						|
	"crypto/sha1"
 | 
						|
	"crypto/tls"
 | 
						|
	"encoding/hex"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"net/http"
 | 
						|
	"net/url"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/mitchellh/mapstructure"
 | 
						|
	"github.com/ncw/swift"
 | 
						|
 | 
						|
	storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
 | 
						|
	"github.com/distribution/distribution/v3/registry/storage/driver/base"
 | 
						|
	"github.com/distribution/distribution/v3/registry/storage/driver/factory"
 | 
						|
	"github.com/distribution/distribution/v3/version"
 | 
						|
)
 | 
						|
 | 
						|
const driverName = "swift"
 | 
						|
 | 
						|
// defaultChunkSize defines the default size of a segment
 | 
						|
const defaultChunkSize = 20 * 1024 * 1024
 | 
						|
 | 
						|
// minChunkSize defines the minimum size of a segment
 | 
						|
const minChunkSize = 1 << 20
 | 
						|
 | 
						|
// contentType defines the Content-Type header associated with stored segments
 | 
						|
const contentType = "application/octet-stream"
 | 
						|
 | 
						|
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
 | 
						|
var readAfterWriteTimeout = 15 * time.Second
 | 
						|
 | 
						|
// readAfterWriteWait defines the time to sleep between two retries
 | 
						|
var readAfterWriteWait = 200 * time.Millisecond
 | 
						|
 | 
						|
// Parameters A struct that encapsulates all of the driver parameters after all values have been set
 | 
						|
type Parameters struct {
 | 
						|
	Username            string
 | 
						|
	Password            string
 | 
						|
	AuthURL             string
 | 
						|
	Tenant              string
 | 
						|
	TenantID            string
 | 
						|
	Domain              string
 | 
						|
	DomainID            string
 | 
						|
	TenantDomain        string
 | 
						|
	TenantDomainID      string
 | 
						|
	TrustID             string
 | 
						|
	Region              string
 | 
						|
	AuthVersion         int
 | 
						|
	Container           string
 | 
						|
	Prefix              string
 | 
						|
	EndpointType        string
 | 
						|
	InsecureSkipVerify  bool
 | 
						|
	ChunkSize           int
 | 
						|
	SecretKey           string
 | 
						|
	AccessKey           string
 | 
						|
	TempURLContainerKey bool
 | 
						|
	TempURLMethods      []string
 | 
						|
}
 | 
						|
 | 
						|
// swiftInfo maps the JSON structure returned by Swift /info endpoint
 | 
						|
type swiftInfo struct {
 | 
						|
	Swift struct {
 | 
						|
		Version string `mapstructure:"version"`
 | 
						|
	}
 | 
						|
	Tempurl struct {
 | 
						|
		Methods []string `mapstructure:"methods"`
 | 
						|
	}
 | 
						|
	BulkDelete struct {
 | 
						|
		MaxDeletesPerRequest int `mapstructure:"max_deletes_per_request"`
 | 
						|
	} `mapstructure:"bulk_delete"`
 | 
						|
}
 | 
						|
 | 
						|
func init() {
 | 
						|
	factory.Register(driverName, &swiftDriverFactory{})
 | 
						|
}
 | 
						|
 | 
						|
// swiftDriverFactory implements the factory.StorageDriverFactory interface
 | 
						|
type swiftDriverFactory struct{}
 | 
						|
 | 
						|
func (factory *swiftDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
 | 
						|
	return FromParameters(parameters)
 | 
						|
}
 | 
						|
 | 
						|
type driver struct {
 | 
						|
	Conn                 *swift.Connection
 | 
						|
	Container            string
 | 
						|
	Prefix               string
 | 
						|
	BulkDeleteSupport    bool
 | 
						|
	BulkDeleteMaxDeletes int
 | 
						|
	ChunkSize            int
 | 
						|
	SecretKey            string
 | 
						|
	AccessKey            string
 | 
						|
	TempURLContainerKey  bool
 | 
						|
	TempURLMethods       []string
 | 
						|
}
 | 
						|
 | 
						|
type baseEmbed struct {
 | 
						|
	base.Base
 | 
						|
}
 | 
						|
 | 
						|
// Driver is a storagedriver.StorageDriver implementation backed by Openstack Swift
 | 
						|
// Objects are stored at absolute keys in the provided container.
 | 
						|
type Driver struct {
 | 
						|
	baseEmbed
 | 
						|
}
 | 
						|
 | 
						|
// FromParameters constructs a new Driver with a given parameters map
 | 
						|
// Required parameters:
 | 
						|
// - username
 | 
						|
// - password
 | 
						|
// - authurl
 | 
						|
// - container
 | 
						|
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
 | 
						|
	params := Parameters{
 | 
						|
		ChunkSize:          defaultChunkSize,
 | 
						|
		InsecureSkipVerify: false,
 | 
						|
	}
 | 
						|
 | 
						|
	// Sanitize some entries before trying to decode parameters with mapstructure
 | 
						|
	// TenantID and Tenant when integers only and passed as ENV variables
 | 
						|
	// are considered as integer and not string. The parser fails in this
 | 
						|
	// case.
 | 
						|
	_, ok := parameters["tenant"]
 | 
						|
	if ok {
 | 
						|
		parameters["tenant"] = fmt.Sprint(parameters["tenant"])
 | 
						|
	}
 | 
						|
	_, ok = parameters["tenantid"]
 | 
						|
	if ok {
 | 
						|
		parameters["tenantid"] = fmt.Sprint(parameters["tenantid"])
 | 
						|
	}
 | 
						|
 | 
						|
	if err := mapstructure.Decode(parameters, ¶ms); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if params.Username == "" {
 | 
						|
		return nil, fmt.Errorf("no username parameter provided")
 | 
						|
	}
 | 
						|
 | 
						|
	if params.Password == "" {
 | 
						|
		return nil, fmt.Errorf("no password parameter provided")
 | 
						|
	}
 | 
						|
 | 
						|
	if params.AuthURL == "" {
 | 
						|
		return nil, fmt.Errorf("no authurl parameter provided")
 | 
						|
	}
 | 
						|
 | 
						|
	if params.Container == "" {
 | 
						|
		return nil, fmt.Errorf("no container parameter provided")
 | 
						|
	}
 | 
						|
 | 
						|
	if params.ChunkSize < minChunkSize {
 | 
						|
		return nil, fmt.Errorf("the chunksize %#v parameter should be a number that is larger than or equal to %d", params.ChunkSize, minChunkSize)
 | 
						|
	}
 | 
						|
 | 
						|
	return New(params)
 | 
						|
}
 | 
						|
 | 
						|
// New constructs a new Driver with the given Openstack Swift credentials and container name
 | 
						|
func New(params Parameters) (*Driver, error) {
 | 
						|
	transport := &http.Transport{
 | 
						|
		Proxy:               http.ProxyFromEnvironment,
 | 
						|
		MaxIdleConnsPerHost: 2048,
 | 
						|
		TLSClientConfig:     &tls.Config{InsecureSkipVerify: params.InsecureSkipVerify},
 | 
						|
	}
 | 
						|
 | 
						|
	ct := &swift.Connection{
 | 
						|
		UserName:       params.Username,
 | 
						|
		ApiKey:         params.Password,
 | 
						|
		AuthUrl:        params.AuthURL,
 | 
						|
		Region:         params.Region,
 | 
						|
		AuthVersion:    params.AuthVersion,
 | 
						|
		UserAgent:      "distribution/" + version.Version,
 | 
						|
		Tenant:         params.Tenant,
 | 
						|
		TenantId:       params.TenantID,
 | 
						|
		Domain:         params.Domain,
 | 
						|
		DomainId:       params.DomainID,
 | 
						|
		TenantDomain:   params.TenantDomain,
 | 
						|
		TenantDomainId: params.TenantDomainID,
 | 
						|
		TrustId:        params.TrustID,
 | 
						|
		EndpointType:   swift.EndpointType(params.EndpointType),
 | 
						|
		Transport:      transport,
 | 
						|
		ConnectTimeout: 60 * time.Second,
 | 
						|
		Timeout:        15 * 60 * time.Second,
 | 
						|
	}
 | 
						|
	err := ct.Authenticate()
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("swift authentication failed: %s", err)
 | 
						|
	}
 | 
						|
 | 
						|
	if _, _, err := ct.Container(params.Container); err == swift.ContainerNotFound {
 | 
						|
		if err := ct.ContainerCreate(params.Container, nil); err != nil {
 | 
						|
			return nil, fmt.Errorf("failed to create container %s (%s)", params.Container, err)
 | 
						|
		}
 | 
						|
	} else if err != nil {
 | 
						|
		return nil, fmt.Errorf("failed to retrieve info about container %s (%s)", params.Container, err)
 | 
						|
	}
 | 
						|
 | 
						|
	d := &driver{
 | 
						|
		Conn:           ct,
 | 
						|
		Container:      params.Container,
 | 
						|
		Prefix:         params.Prefix,
 | 
						|
		ChunkSize:      params.ChunkSize,
 | 
						|
		TempURLMethods: make([]string, 0),
 | 
						|
		AccessKey:      params.AccessKey,
 | 
						|
	}
 | 
						|
 | 
						|
	info := swiftInfo{}
 | 
						|
	if config, err := d.Conn.QueryInfo(); err == nil {
 | 
						|
		_, d.BulkDeleteSupport = config["bulk_delete"]
 | 
						|
 | 
						|
		if err := mapstructure.Decode(config, &info); err == nil {
 | 
						|
			d.TempURLContainerKey = info.Swift.Version >= "2.3.0"
 | 
						|
			d.TempURLMethods = info.Tempurl.Methods
 | 
						|
			if d.BulkDeleteSupport {
 | 
						|
				d.BulkDeleteMaxDeletes = info.BulkDelete.MaxDeletesPerRequest
 | 
						|
			}
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		d.TempURLContainerKey = params.TempURLContainerKey
 | 
						|
		d.TempURLMethods = params.TempURLMethods
 | 
						|
	}
 | 
						|
 | 
						|
	if len(d.TempURLMethods) > 0 {
 | 
						|
		secretKey := params.SecretKey
 | 
						|
		if secretKey == "" {
 | 
						|
			secretKey, _ = generateSecret()
 | 
						|
		}
 | 
						|
 | 
						|
		// Since Swift 2.2.2, we can now set secret keys on containers
 | 
						|
		// in addition to the account secret keys. Use them in preference.
 | 
						|
		if d.TempURLContainerKey {
 | 
						|
			_, containerHeaders, err := d.Conn.Container(d.Container)
 | 
						|
			if err != nil {
 | 
						|
				return nil, fmt.Errorf("failed to fetch container info %s (%s)", d.Container, err)
 | 
						|
			}
 | 
						|
 | 
						|
			d.SecretKey = containerHeaders["X-Container-Meta-Temp-Url-Key"]
 | 
						|
			if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
 | 
						|
				m := swift.Metadata{}
 | 
						|
				m["temp-url-key"] = secretKey
 | 
						|
				if d.Conn.ContainerUpdate(d.Container, m.ContainerHeaders()); err == nil {
 | 
						|
					d.SecretKey = secretKey
 | 
						|
				}
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			// Use the account secret key
 | 
						|
			_, accountHeaders, err := d.Conn.Account()
 | 
						|
			if err != nil {
 | 
						|
				return nil, fmt.Errorf("failed to fetch account info (%s)", err)
 | 
						|
			}
 | 
						|
 | 
						|
			d.SecretKey = accountHeaders["X-Account-Meta-Temp-Url-Key"]
 | 
						|
			if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
 | 
						|
				m := swift.Metadata{}
 | 
						|
				m["temp-url-key"] = secretKey
 | 
						|
				if err := d.Conn.AccountUpdate(m.AccountHeaders()); err == nil {
 | 
						|
					d.SecretKey = secretKey
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return &Driver{
 | 
						|
		baseEmbed: baseEmbed{
 | 
						|
			Base: base.Base{
 | 
						|
				StorageDriver: d,
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Implement the storagedriver.StorageDriver interface
 | 
						|
 | 
						|
func (d *driver) Name() string {
 | 
						|
	return driverName
 | 
						|
}
 | 
						|
 | 
						|
// GetContent retrieves the content stored at "path" as a []byte.
 | 
						|
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
 | 
						|
	content, err := d.Conn.ObjectGetBytes(d.Container, d.swiftPath(path))
 | 
						|
	if err == swift.ObjectNotFound {
 | 
						|
		return nil, storagedriver.PathNotFoundError{Path: path}
 | 
						|
	}
 | 
						|
	return content, err
 | 
						|
}
 | 
						|
 | 
						|
// PutContent stores the []byte content at a location designated by "path".
 | 
						|
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
 | 
						|
	err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, contentType)
 | 
						|
	if err == swift.ObjectNotFound {
 | 
						|
		return storagedriver.PathNotFoundError{Path: path}
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
 | 
						|
// given byte offset.
 | 
						|
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
 | 
						|
	headers := make(swift.Headers)
 | 
						|
	headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-"
 | 
						|
 | 
						|
	waitingTime := readAfterWriteWait
 | 
						|
	endTime := time.Now().Add(readAfterWriteTimeout)
 | 
						|
 | 
						|
	for {
 | 
						|
		file, headers, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
 | 
						|
		if err != nil {
 | 
						|
			if err == swift.ObjectNotFound {
 | 
						|
				return nil, storagedriver.PathNotFoundError{Path: path}
 | 
						|
			}
 | 
						|
			if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == http.StatusRequestedRangeNotSatisfiable {
 | 
						|
				return ioutil.NopCloser(bytes.NewReader(nil)), nil
 | 
						|
			}
 | 
						|
			return file, err
 | 
						|
		}
 | 
						|
 | 
						|
		//if this is a DLO and it is clear that segments are still missing,
 | 
						|
		//wait until they show up
 | 
						|
		_, isDLO := headers["X-Object-Manifest"]
 | 
						|
		size, err := file.Length()
 | 
						|
		if err != nil {
 | 
						|
			return file, err
 | 
						|
		}
 | 
						|
		if isDLO && size == 0 {
 | 
						|
			if time.Now().Add(waitingTime).After(endTime) {
 | 
						|
				return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
 | 
						|
			}
 | 
						|
			time.Sleep(waitingTime)
 | 
						|
			waitingTime *= 2
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		//if not, then this reader will be fine
 | 
						|
		return file, nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Writer returns a FileWriter which will store the content written to it
 | 
						|
// at the location designated by "path" after the call to Commit.
 | 
						|
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
 | 
						|
	var (
 | 
						|
		segments     []swift.Object
 | 
						|
		segmentsPath string
 | 
						|
		err          error
 | 
						|
	)
 | 
						|
 | 
						|
	if !append {
 | 
						|
		segmentsPath, err = d.swiftSegmentPath(path)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path))
 | 
						|
		if err == swift.ObjectNotFound {
 | 
						|
			return nil, storagedriver.PathNotFoundError{Path: path}
 | 
						|
		} else if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		manifest, ok := headers["X-Object-Manifest"]
 | 
						|
		if !ok {
 | 
						|
			segmentsPath, err = d.swiftSegmentPath(path)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegmentPath(segmentsPath, len(segments))); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			segments = []swift.Object{info}
 | 
						|
		} else {
 | 
						|
			_, segmentsPath = parseManifest(manifest)
 | 
						|
			if segments, err = d.getAllSegments(segmentsPath); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return d.newWriter(path, segmentsPath, segments), nil
 | 
						|
}
 | 
						|
 | 
						|
// Stat retrieves the FileInfo for the given path, including the current size
 | 
						|
// in bytes and the creation time.
 | 
						|
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
 | 
						|
	swiftPath := d.swiftPath(path)
 | 
						|
	opts := &swift.ObjectsOpts{
 | 
						|
		Prefix:    swiftPath,
 | 
						|
		Delimiter: '/',
 | 
						|
	}
 | 
						|
 | 
						|
	objects, err := d.Conn.ObjectsAll(d.Container, opts)
 | 
						|
	if err != nil {
 | 
						|
		if err == swift.ContainerNotFound {
 | 
						|
			return nil, storagedriver.PathNotFoundError{Path: path}
 | 
						|
		}
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	fi := storagedriver.FileInfoFields{
 | 
						|
		Path: strings.TrimPrefix(strings.TrimSuffix(swiftPath, "/"), d.swiftPath("/")),
 | 
						|
	}
 | 
						|
 | 
						|
	for _, obj := range objects {
 | 
						|
		if obj.PseudoDirectory && obj.Name == swiftPath+"/" {
 | 
						|
			fi.IsDir = true
 | 
						|
			return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
 | 
						|
		} else if obj.Name == swiftPath {
 | 
						|
			// The file exists. But on Swift 1.12, the 'bytes' field is always 0 so
 | 
						|
			// we need to do a separate HEAD request.
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	//Don't trust an empty `objects` slice. A container listing can be
 | 
						|
	//outdated. For files, we can make a HEAD request on the object which
 | 
						|
	//reports existence (at least) much more reliably.
 | 
						|
	waitingTime := readAfterWriteWait
 | 
						|
	endTime := time.Now().Add(readAfterWriteTimeout)
 | 
						|
 | 
						|
	for {
 | 
						|
		info, headers, err := d.Conn.Object(d.Container, swiftPath)
 | 
						|
		if err != nil {
 | 
						|
			if err == swift.ObjectNotFound {
 | 
						|
				return nil, storagedriver.PathNotFoundError{Path: path}
 | 
						|
			}
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		//if this is a DLO and it is clear that segments are still missing,
 | 
						|
		//wait until they show up
 | 
						|
		_, isDLO := headers["X-Object-Manifest"]
 | 
						|
		if isDLO && info.Bytes == 0 {
 | 
						|
			if time.Now().Add(waitingTime).After(endTime) {
 | 
						|
				return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
 | 
						|
			}
 | 
						|
			time.Sleep(waitingTime)
 | 
						|
			waitingTime *= 2
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		//otherwise, accept the result
 | 
						|
		fi.IsDir = false
 | 
						|
		fi.Size = info.Bytes
 | 
						|
		fi.ModTime = info.LastModified
 | 
						|
		return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// List returns a list of the objects that are direct descendants of the given path.
 | 
						|
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
 | 
						|
	var files []string
 | 
						|
 | 
						|
	prefix := d.swiftPath(path)
 | 
						|
	if prefix != "" {
 | 
						|
		prefix += "/"
 | 
						|
	}
 | 
						|
 | 
						|
	opts := &swift.ObjectsOpts{
 | 
						|
		Prefix:    prefix,
 | 
						|
		Delimiter: '/',
 | 
						|
	}
 | 
						|
 | 
						|
	objects, err := d.Conn.ObjectsAll(d.Container, opts)
 | 
						|
	for _, obj := range objects {
 | 
						|
		files = append(files, strings.TrimPrefix(strings.TrimSuffix(obj.Name, "/"), d.swiftPath("/")))
 | 
						|
	}
 | 
						|
 | 
						|
	if err == swift.ContainerNotFound || (len(objects) == 0 && path != "/") {
 | 
						|
		return files, storagedriver.PathNotFoundError{Path: path}
 | 
						|
	}
 | 
						|
	return files, err
 | 
						|
}
 | 
						|
 | 
						|
// Move moves an object stored at sourcePath to destPath, removing the original
 | 
						|
// object.
 | 
						|
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
 | 
						|
	_, headers, err := d.Conn.Object(d.Container, d.swiftPath(sourcePath))
 | 
						|
	if err == nil {
 | 
						|
		if manifest, ok := headers["X-Object-Manifest"]; ok {
 | 
						|
			if err = d.createManifest(destPath, manifest); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			err = d.Conn.ObjectDelete(d.Container, d.swiftPath(sourcePath))
 | 
						|
		} else {
 | 
						|
			err = d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if err == swift.ObjectNotFound {
 | 
						|
		return storagedriver.PathNotFoundError{Path: sourcePath}
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Delete recursively deletes all objects stored at "path" and its subpaths.
 | 
						|
func (d *driver) Delete(ctx context.Context, path string) error {
 | 
						|
	opts := swift.ObjectsOpts{
 | 
						|
		Prefix: d.swiftPath(path) + "/",
 | 
						|
	}
 | 
						|
 | 
						|
	objects, err := d.Conn.ObjectsAll(d.Container, &opts)
 | 
						|
	if err != nil {
 | 
						|
		if err == swift.ContainerNotFound {
 | 
						|
			return storagedriver.PathNotFoundError{Path: path}
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	for _, obj := range objects {
 | 
						|
		if obj.PseudoDirectory {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
 | 
						|
			manifest, ok := headers["X-Object-Manifest"]
 | 
						|
			if ok {
 | 
						|
				_, prefix := parseManifest(manifest)
 | 
						|
				segments, err := d.getAllSegments(prefix)
 | 
						|
				if err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
				objects = append(objects, segments...)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			if err == swift.ObjectNotFound {
 | 
						|
				return storagedriver.PathNotFoundError{Path: obj.Name}
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if d.BulkDeleteSupport && len(objects) > 0 && d.BulkDeleteMaxDeletes > 0 {
 | 
						|
		filenames := make([]string, len(objects))
 | 
						|
		for i, obj := range objects {
 | 
						|
			filenames[i] = obj.Name
 | 
						|
		}
 | 
						|
 | 
						|
		chunks, err := chunkFilenames(filenames, d.BulkDeleteMaxDeletes)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		for _, chunk := range chunks {
 | 
						|
			_, err := d.Conn.BulkDelete(d.Container, chunk)
 | 
						|
			// Don't fail on ObjectNotFound because eventual consistency
 | 
						|
			// makes this situation normal.
 | 
						|
			if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound {
 | 
						|
				if err == swift.ContainerNotFound {
 | 
						|
					return storagedriver.PathNotFoundError{Path: path}
 | 
						|
				}
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		for _, obj := range objects {
 | 
						|
			if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
 | 
						|
				if err == swift.ObjectNotFound {
 | 
						|
					return storagedriver.PathNotFoundError{Path: obj.Name}
 | 
						|
				}
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	_, _, err = d.Conn.Object(d.Container, d.swiftPath(path))
 | 
						|
	if err == nil {
 | 
						|
		if err := d.Conn.ObjectDelete(d.Container, d.swiftPath(path)); err != nil {
 | 
						|
			if err == swift.ObjectNotFound {
 | 
						|
				return storagedriver.PathNotFoundError{Path: path}
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	} else if err == swift.ObjectNotFound {
 | 
						|
		if len(objects) == 0 {
 | 
						|
			return storagedriver.PathNotFoundError{Path: path}
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
 | 
						|
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
 | 
						|
	if d.SecretKey == "" {
 | 
						|
		return "", storagedriver.ErrUnsupportedMethod{}
 | 
						|
	}
 | 
						|
 | 
						|
	methodString := "GET"
 | 
						|
	method, ok := options["method"]
 | 
						|
	if ok {
 | 
						|
		if methodString, ok = method.(string); !ok {
 | 
						|
			return "", storagedriver.ErrUnsupportedMethod{}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if methodString == "HEAD" {
 | 
						|
		// A "HEAD" request on a temporary URL is allowed if the
 | 
						|
		// signature was generated with "GET", "POST" or "PUT"
 | 
						|
		methodString = "GET"
 | 
						|
	}
 | 
						|
 | 
						|
	supported := false
 | 
						|
	for _, method := range d.TempURLMethods {
 | 
						|
		if method == methodString {
 | 
						|
			supported = true
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if !supported {
 | 
						|
		return "", storagedriver.ErrUnsupportedMethod{}
 | 
						|
	}
 | 
						|
 | 
						|
	expiresTime := time.Now().Add(20 * time.Minute)
 | 
						|
	expires, ok := options["expiry"]
 | 
						|
	if ok {
 | 
						|
		et, ok := expires.(time.Time)
 | 
						|
		if ok {
 | 
						|
			expiresTime = et
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	tempURL := d.Conn.ObjectTempUrl(d.Container, d.swiftPath(path), d.SecretKey, methodString, expiresTime)
 | 
						|
 | 
						|
	if d.AccessKey != "" {
 | 
						|
		// On HP Cloud, the signature must be in the form of tenant_id:access_key:signature
 | 
						|
		url, _ := url.Parse(tempURL)
 | 
						|
		query := url.Query()
 | 
						|
		query.Set("temp_url_sig", fmt.Sprintf("%s:%s:%s", d.Conn.TenantId, d.AccessKey, query.Get("temp_url_sig")))
 | 
						|
		url.RawQuery = query.Encode()
 | 
						|
		tempURL = url.String()
 | 
						|
	}
 | 
						|
 | 
						|
	return tempURL, nil
 | 
						|
}
 | 
						|
 | 
						|
// Walk traverses a filesystem defined within driver, starting
 | 
						|
// from the given path, calling f on each file and directory
 | 
						|
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
 | 
						|
	return storagedriver.WalkFallback(ctx, d, path, f)
 | 
						|
}
 | 
						|
 | 
						|
func (d *driver) swiftPath(path string) string {
 | 
						|
	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
 | 
						|
}
 | 
						|
 | 
						|
// swiftSegmentPath returns a randomly generated path in the segments directory.
 | 
						|
func (d *driver) swiftSegmentPath(path string) (string, error) {
 | 
						|
	checksum := sha1.New()
 | 
						|
	checksum.Write([]byte(path))
 | 
						|
 | 
						|
	if _, err := io.CopyN(checksum, rand.Reader, 32); err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	path = hex.EncodeToString(checksum.Sum(nil))
 | 
						|
	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
 | 
						|
}
 | 
						|
 | 
						|
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
 | 
						|
	//a simple container listing works 99.9% of the time
 | 
						|
	segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
 | 
						|
	if err != nil {
 | 
						|
		if err == swift.ContainerNotFound {
 | 
						|
			return nil, storagedriver.PathNotFoundError{Path: path}
 | 
						|
		}
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	//build a lookup table by object name
 | 
						|
	hasObjectName := make(map[string]struct{})
 | 
						|
	for _, segment := range segments {
 | 
						|
		hasObjectName[segment.Name] = struct{}{}
 | 
						|
	}
 | 
						|
 | 
						|
	//The container listing might be outdated (i.e. not contain all existing
 | 
						|
	//segment objects yet) because of temporary inconsistency (Swift is only
 | 
						|
	//eventually consistent!). Check its completeness.
 | 
						|
	segmentNumber := 0
 | 
						|
	for {
 | 
						|
		segmentNumber++
 | 
						|
		segmentPath := getSegmentPath(path, segmentNumber)
 | 
						|
 | 
						|
		if _, seen := hasObjectName[segmentPath]; seen {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		//This segment is missing in the container listing. Use a more reliable
 | 
						|
		//request to check its existence. (HEAD requests on segments are
 | 
						|
		//guaranteed to return the correct metadata, except for the pathological
 | 
						|
		//case of an outage of large parts of the Swift cluster or its network,
 | 
						|
		//since every segment is only written once.)
 | 
						|
		segment, _, err := d.Conn.Object(d.Container, segmentPath)
 | 
						|
		switch err {
 | 
						|
		case nil:
 | 
						|
			//found new segment -> keep going, more might be missing
 | 
						|
			segments = append(segments, segment)
 | 
						|
			continue
 | 
						|
		case swift.ObjectNotFound:
 | 
						|
			//This segment is missing. Since we upload segments sequentially,
 | 
						|
			//there won't be any more segments after it.
 | 
						|
			return segments, nil
 | 
						|
		default:
 | 
						|
			return nil, err //unexpected error
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (d *driver) createManifest(path string, segments string) error {
 | 
						|
	headers := make(swift.Headers)
 | 
						|
	headers["X-Object-Manifest"] = segments
 | 
						|
	manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", contentType, headers)
 | 
						|
	if err != nil {
 | 
						|
		if err == swift.ObjectNotFound {
 | 
						|
			return storagedriver.PathNotFoundError{Path: path}
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if err := manifest.Close(); err != nil {
 | 
						|
		if err == swift.ObjectNotFound {
 | 
						|
			return storagedriver.PathNotFoundError{Path: path}
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func chunkFilenames(slice []string, maxSize int) (chunks [][]string, err error) {
 | 
						|
	if maxSize > 0 {
 | 
						|
		for offset := 0; offset < len(slice); offset += maxSize {
 | 
						|
			chunkSize := maxSize
 | 
						|
			if offset+chunkSize > len(slice) {
 | 
						|
				chunkSize = len(slice) - offset
 | 
						|
			}
 | 
						|
			chunks = append(chunks, slice[offset:offset+chunkSize])
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		return nil, fmt.Errorf("max chunk size must be > 0")
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func parseManifest(manifest string) (container string, prefix string) {
 | 
						|
	components := strings.SplitN(manifest, "/", 2)
 | 
						|
	container = components[0]
 | 
						|
	if len(components) > 1 {
 | 
						|
		prefix = components[1]
 | 
						|
	}
 | 
						|
	return container, prefix
 | 
						|
}
 | 
						|
 | 
						|
func generateSecret() (string, error) {
 | 
						|
	var secretBytes [32]byte
 | 
						|
	if _, err := rand.Read(secretBytes[:]); err != nil {
 | 
						|
		return "", fmt.Errorf("could not generate random bytes for Swift secret key: %v", err)
 | 
						|
	}
 | 
						|
	return hex.EncodeToString(secretBytes[:]), nil
 | 
						|
}
 | 
						|
 | 
						|
func getSegmentPath(segmentsPath string, partNumber int) string {
 | 
						|
	return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
 | 
						|
}
 | 
						|
 | 
						|
type writer struct {
 | 
						|
	driver       *driver
 | 
						|
	path         string
 | 
						|
	segmentsPath string
 | 
						|
	size         int64
 | 
						|
	bw           *bufio.Writer
 | 
						|
	closed       bool
 | 
						|
	committed    bool
 | 
						|
	cancelled    bool
 | 
						|
}
 | 
						|
 | 
						|
func (d *driver) newWriter(path, segmentsPath string, segments []swift.Object) storagedriver.FileWriter {
 | 
						|
	var size int64
 | 
						|
	for _, segment := range segments {
 | 
						|
		size += segment.Bytes
 | 
						|
	}
 | 
						|
	return &writer{
 | 
						|
		driver:       d,
 | 
						|
		path:         path,
 | 
						|
		segmentsPath: segmentsPath,
 | 
						|
		size:         size,
 | 
						|
		bw: bufio.NewWriterSize(&segmentWriter{
 | 
						|
			conn:          d.Conn,
 | 
						|
			container:     d.Container,
 | 
						|
			segmentsPath:  segmentsPath,
 | 
						|
			segmentNumber: len(segments) + 1,
 | 
						|
			maxChunkSize:  d.ChunkSize,
 | 
						|
		}, d.ChunkSize),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *writer) Write(p []byte) (int, error) {
 | 
						|
	if w.closed {
 | 
						|
		return 0, fmt.Errorf("already closed")
 | 
						|
	} else if w.committed {
 | 
						|
		return 0, fmt.Errorf("already committed")
 | 
						|
	} else if w.cancelled {
 | 
						|
		return 0, fmt.Errorf("already cancelled")
 | 
						|
	}
 | 
						|
 | 
						|
	n, err := w.bw.Write(p)
 | 
						|
	w.size += int64(n)
 | 
						|
	return n, err
 | 
						|
}
 | 
						|
 | 
						|
func (w *writer) Size() int64 {
 | 
						|
	return w.size
 | 
						|
}
 | 
						|
 | 
						|
func (w *writer) Close() error {
 | 
						|
	if w.closed {
 | 
						|
		return fmt.Errorf("already closed")
 | 
						|
	}
 | 
						|
 | 
						|
	if err := w.bw.Flush(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if !w.committed && !w.cancelled {
 | 
						|
		if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if err := w.waitForSegmentsToShowUp(); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	w.closed = true
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (w *writer) Cancel() error {
 | 
						|
	if w.closed {
 | 
						|
		return fmt.Errorf("already closed")
 | 
						|
	} else if w.committed {
 | 
						|
		return fmt.Errorf("already committed")
 | 
						|
	}
 | 
						|
	w.cancelled = true
 | 
						|
	return w.driver.Delete(context.Background(), w.path)
 | 
						|
}
 | 
						|
 | 
						|
func (w *writer) Commit() error {
 | 
						|
	if w.closed {
 | 
						|
		return fmt.Errorf("already closed")
 | 
						|
	} else if w.committed {
 | 
						|
		return fmt.Errorf("already committed")
 | 
						|
	} else if w.cancelled {
 | 
						|
		return fmt.Errorf("already cancelled")
 | 
						|
	}
 | 
						|
 | 
						|
	if err := w.bw.Flush(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	w.committed = true
 | 
						|
	return w.waitForSegmentsToShowUp()
 | 
						|
}
 | 
						|
 | 
						|
func (w *writer) waitForSegmentsToShowUp() error {
 | 
						|
	var err error
 | 
						|
	waitingTime := readAfterWriteWait
 | 
						|
	endTime := time.Now().Add(readAfterWriteTimeout)
 | 
						|
 | 
						|
	for {
 | 
						|
		var info swift.Object
 | 
						|
		if info, _, err = w.driver.Conn.Object(w.driver.Container, w.driver.swiftPath(w.path)); err == nil {
 | 
						|
			if info.Bytes == w.size {
 | 
						|
				break
 | 
						|
			}
 | 
						|
			err = fmt.Errorf("timeout expired while waiting for segments of %s to show up", w.path)
 | 
						|
		}
 | 
						|
		if time.Now().Add(waitingTime).After(endTime) {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		time.Sleep(waitingTime)
 | 
						|
		waitingTime *= 2
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
type segmentWriter struct {
 | 
						|
	conn          *swift.Connection
 | 
						|
	container     string
 | 
						|
	segmentsPath  string
 | 
						|
	segmentNumber int
 | 
						|
	maxChunkSize  int
 | 
						|
}
 | 
						|
 | 
						|
func (sw *segmentWriter) Write(p []byte) (int, error) {
 | 
						|
	n := 0
 | 
						|
	for offset := 0; offset < len(p); offset += sw.maxChunkSize {
 | 
						|
		chunkSize := sw.maxChunkSize
 | 
						|
		if offset+chunkSize > len(p) {
 | 
						|
			chunkSize = len(p) - offset
 | 
						|
		}
 | 
						|
		_, err := sw.conn.ObjectPut(sw.container, getSegmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
 | 
						|
		if err != nil {
 | 
						|
			return n, err
 | 
						|
		}
 | 
						|
 | 
						|
		sw.segmentNumber++
 | 
						|
		n += chunkSize
 | 
						|
	}
 | 
						|
 | 
						|
	return n, nil
 | 
						|
}
 |