Add Openstack Swift storage driver
Signed-off-by: Sylvain Baubeau <sbaubeau@redhat.com>master
							parent
							
								
									5ee441cdc7
								
							
						
					
					
						commit
						ea7c53df08
					
				| 
						 | 
				
			
			@ -81,6 +81,9 @@
 | 
			
		|||
		{
 | 
			
		||||
			"ImportPath": "github.com/stevvooe/resumable",
 | 
			
		||||
			"Rev": "51ad44105773cafcbe91927f70ac68e1bf78f8b4"
 | 
			
		||||
                },
 | 
			
		||||
			"ImportPath": "github.com/lebauce/swift",
 | 
			
		||||
			"Rev": "677cb70f5d40fa1a81ddb32f872615a57bb42381"
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			"ImportPath": "github.com/yvasiyarov/go-metrics",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,32 @@
 | 
			
		|||
// +build ignore
 | 
			
		||||
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"os"
 | 
			
		||||
 | 
			
		||||
	"github.com/Sirupsen/logrus"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/ipc"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/swift"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// An out-of-process Swift driver, intended to be run by ipc.NewDriverClient
 | 
			
		||||
func main() {
 | 
			
		||||
	parametersBytes := []byte(os.Args[1])
 | 
			
		||||
	var parameters map[string]string
 | 
			
		||||
	err := json.Unmarshal(parametersBytes, ¶meters)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	driver, err := swift.FromParameters(parameters)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := ipc.StorageDriverServer(driver); err != nil {
 | 
			
		||||
		logrus.Fatalln(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -28,6 +28,7 @@ import (
 | 
			
		|||
	_ "github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
			
		||||
	_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront"
 | 
			
		||||
	_ "github.com/docker/distribution/registry/storage/driver/s3"
 | 
			
		||||
	_ "github.com/docker/distribution/registry/storage/driver/swift"
 | 
			
		||||
	"github.com/docker/distribution/version"
 | 
			
		||||
	gorhandlers "github.com/gorilla/handlers"
 | 
			
		||||
	"github.com/yvasiyarov/gorelic"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -49,6 +49,7 @@ This section lists all the registry configuration options. Some options in
 | 
			
		|||
the list are mutually exclusive. So, make sure to read the detailed reference
 | 
			
		||||
information about each option that appears later in this page.
 | 
			
		||||
 | 
			
		||||
<<<<<<< HEAD
 | 
			
		||||
    version: 0.1
 | 
			
		||||
    log:
 | 
			
		||||
      level: debug
 | 
			
		||||
| 
						 | 
				
			
			@ -92,6 +93,14 @@ information about each option that appears later in this page.
 | 
			
		|||
        poolname: radospool
 | 
			
		||||
        username: radosuser
 | 
			
		||||
        chunksize: 4194304
 | 
			
		||||
      swift:
 | 
			
		||||
        username: username
 | 
			
		||||
        password: password
 | 
			
		||||
        authurl: https://storage.myprovider.com/v2.0
 | 
			
		||||
        tenant: tenantname
 | 
			
		||||
        region: fr
 | 
			
		||||
        container: containername
 | 
			
		||||
        rootdirectory: /swift/object/name/prefix
 | 
			
		||||
      cache:
 | 
			
		||||
        blobdescriptor: redis
 | 
			
		||||
      maintenance:
 | 
			
		||||
| 
						 | 
				
			
			@ -580,6 +589,107 @@ must be set.
 | 
			
		|||
 | 
			
		||||
Note: `age` and `interval` are strings containing a number with optional fraction and a unit suffix: e.g. 45m, 2h10m, 168h (1 week).
 | 
			
		||||
 | 
			
		||||
### Openstack Swift
 | 
			
		||||
 | 
			
		||||
This storage backend uses Openstack Swift object storage.
 | 
			
		||||
 | 
			
		||||
<table>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <th>Parameter</th>
 | 
			
		||||
    <th>Required</th>
 | 
			
		||||
    <th>Description</th>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>authurl</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      yes
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      URL for obtaining an auth token.
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>username</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      yes
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      Your Openstack user name.
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>password</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      yes
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      Your Openstack password.
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>region</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      no
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      The Openstack region in which your container exists.
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>container</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      yes
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      The container name in which you want to store the registry's data.
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>tenant</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      no
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      Your Openstack tenant name.
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>chunksize</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      no
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      Size of the data segments for the Swift Dynamic Large Objects. This value should be a number (defaults to 5M).
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
  <tr>
 | 
			
		||||
    <td>
 | 
			
		||||
      <code>rootdirectory</code>
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      no
 | 
			
		||||
    </td>
 | 
			
		||||
    <td>
 | 
			
		||||
      This is a prefix that will be applied to all Swift keys to allow you to segment data in your container if necessary.
 | 
			
		||||
    </td>
 | 
			
		||||
  </tr>
 | 
			
		||||
</table>
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
## auth
 | 
			
		||||
 | 
			
		||||
    auth:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,21 @@
 | 
			
		|||
# Openstack Swift storage driver
 | 
			
		||||
 | 
			
		||||
An implementation of the `storagedriver.StorageDriver` interface which uses Openstack Swift for object storage.
 | 
			
		||||
 | 
			
		||||
## Parameters
 | 
			
		||||
 | 
			
		||||
`authurl`: URL for obtaining an auth token.
 | 
			
		||||
 | 
			
		||||
`username`: Your Openstack user name.
 | 
			
		||||
 | 
			
		||||
`password`: Your Openstack password.
 | 
			
		||||
 | 
			
		||||
`container`: The name of your Swift container where you wish to store objects. An additional container - named `<container>_segments` to store the data will be used. The driver will try to create both containers during its initialization.
 | 
			
		||||
 | 
			
		||||
`tenant`: (optional) Your Openstack tenant name.
 | 
			
		||||
 | 
			
		||||
`region`: (optional) The name of the Openstack region in which you would like to store objects (for example `fr`).
 | 
			
		||||
 | 
			
		||||
`chunksize`: (optional) The segment size for Dynamic Large Objects uploads (performed by WriteStream) to swift. The default is 5 MB. You might experience better performance for larger chunk sizes depending on the speed of your connection to Swift.
 | 
			
		||||
 | 
			
		||||
`rootdirectory`: (optional) The root directory tree in which all registry files will be stored. Defaults to the empty string (container root).
 | 
			
		||||
| 
						 | 
				
			
			@ -23,6 +23,7 @@ This storage driver package comes bundled with several drivers:
 | 
			
		|||
- [s3](storage-drivers/s3.md): A driver storing objects in an Amazon Simple Storage Solution (S3) bucket.
 | 
			
		||||
- [azure](storage-drivers/azure.md): A driver storing objects in [Microsoft Azure Blob Storage](http://azure.microsoft.com/en-us/services/storage/).
 | 
			
		||||
- [rados](storage-drivers/rados.md): A driver storing objects in a [Ceph Object Storage](http://ceph.com/docs/master/rados/) pool.
 | 
			
		||||
- [swift](storage-drivers/swift): A driver storing objects in Openstack Swift.
 | 
			
		||||
 | 
			
		||||
## Storage Driver API
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,519 @@
 | 
			
		|||
// Package swift provides a storagedriver.StorageDriver implementation to
 | 
			
		||||
// store blobs in Openstack Swift object storage.
 | 
			
		||||
//
 | 
			
		||||
// This package leverages the ncw/swift client library for interfacing with
 | 
			
		||||
// Swift.
 | 
			
		||||
//
 | 
			
		||||
// Because Swift is a key, value store the Stat call does not support last modification
 | 
			
		||||
// time for directories (directories are an abstraction for key, value stores)
 | 
			
		||||
package swift
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	gopath "path"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/lebauce/swift"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/base"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/factory"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const driverName = "swift"
 | 
			
		||||
 | 
			
		||||
const defaultChunkSize = 5 * 1024 * 1024
 | 
			
		||||
 | 
			
		||||
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
 | 
			
		||||
type DriverParameters struct {
 | 
			
		||||
	Username  string
 | 
			
		||||
	Password  string
 | 
			
		||||
	AuthURL   string
 | 
			
		||||
	Tenant    string
 | 
			
		||||
	Region    string
 | 
			
		||||
	Container string
 | 
			
		||||
	Prefix    string
 | 
			
		||||
	ChunkSize int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type swiftInfo map[string]interface{}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	factory.Register(driverName, &swiftDriverFactory{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// swiftDriverFactory implements the factory.StorageDriverFactory interface
 | 
			
		||||
type swiftDriverFactory struct{}
 | 
			
		||||
 | 
			
		||||
func (factory *swiftDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
 | 
			
		||||
	return FromParameters(parameters)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type driver struct {
 | 
			
		||||
	Conn              swift.Connection
 | 
			
		||||
	Container         string
 | 
			
		||||
	Prefix            string
 | 
			
		||||
	BulkDeleteSupport bool
 | 
			
		||||
	ChunkSize         int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type baseEmbed struct {
 | 
			
		||||
	base.Base
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Driver is a storagedriver.StorageDriver implementation backed by Amazon Swift
 | 
			
		||||
// Objects are stored at absolute keys in the provided bucket.
 | 
			
		||||
type Driver struct {
 | 
			
		||||
	baseEmbed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// FromParameters constructs a new Driver with a given parameters map
 | 
			
		||||
// Required parameters:
 | 
			
		||||
// - username
 | 
			
		||||
// - password
 | 
			
		||||
// - authurl
 | 
			
		||||
// - container
 | 
			
		||||
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
 | 
			
		||||
	username, ok := parameters["username"]
 | 
			
		||||
	if !ok || fmt.Sprint(username) == "" {
 | 
			
		||||
		return nil, fmt.Errorf("No username parameter provided")
 | 
			
		||||
	}
 | 
			
		||||
	password, ok := parameters["password"]
 | 
			
		||||
	if !ok || fmt.Sprint(password) == "" {
 | 
			
		||||
		return nil, fmt.Errorf("No password parameter provided")
 | 
			
		||||
	}
 | 
			
		||||
	authURL, ok := parameters["authurl"]
 | 
			
		||||
	if !ok || fmt.Sprint(authURL) == "" {
 | 
			
		||||
		return nil, fmt.Errorf("No container parameter provided")
 | 
			
		||||
	}
 | 
			
		||||
	container, ok := parameters["container"]
 | 
			
		||||
	if !ok || fmt.Sprint(container) == "" {
 | 
			
		||||
		return nil, fmt.Errorf("No container parameter provided")
 | 
			
		||||
	}
 | 
			
		||||
	tenant, ok := parameters["tenant"]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		tenant = ""
 | 
			
		||||
	}
 | 
			
		||||
	region, ok := parameters["region"]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		region = ""
 | 
			
		||||
	}
 | 
			
		||||
	rootDirectory, ok := parameters["rootdirectory"]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		rootDirectory = ""
 | 
			
		||||
	}
 | 
			
		||||
	chunkSize := int64(defaultChunkSize)
 | 
			
		||||
	chunkSizeParam, ok := parameters["chunksize"]
 | 
			
		||||
	if ok {
 | 
			
		||||
		chunkSize, ok = chunkSizeParam.(int64)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil, fmt.Errorf("The chunksize parameter should be a number")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	params := DriverParameters{
 | 
			
		||||
		fmt.Sprint(username),
 | 
			
		||||
		fmt.Sprint(password),
 | 
			
		||||
		fmt.Sprint(authURL),
 | 
			
		||||
		fmt.Sprint(tenant),
 | 
			
		||||
		fmt.Sprint(region),
 | 
			
		||||
		fmt.Sprint(container),
 | 
			
		||||
		fmt.Sprint(rootDirectory),
 | 
			
		||||
		chunkSize,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return New(params)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New constructs a new Driver with the given Openstack Swift credentials and container name
 | 
			
		||||
func New(params DriverParameters) (*Driver, error) {
 | 
			
		||||
	ct := swift.Connection{
 | 
			
		||||
		UserName:       params.Username,
 | 
			
		||||
		ApiKey:         params.Password,
 | 
			
		||||
		AuthUrl:        params.AuthURL,
 | 
			
		||||
		Region:         params.Region,
 | 
			
		||||
		UserAgent:      "distribution",
 | 
			
		||||
		Tenant:         params.Tenant,
 | 
			
		||||
		ConnectTimeout: 60 * time.Second,
 | 
			
		||||
		Timeout:        15 * 60 * time.Second,
 | 
			
		||||
	}
 | 
			
		||||
	err := ct.Authenticate()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("Swift authentication failed: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := ct.ContainerCreate(params.Container, nil); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := ct.ContainerCreate(params.Container + "_segments", nil); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container + "_segments", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	d := &driver{
 | 
			
		||||
		Conn:              ct,
 | 
			
		||||
		Container:         params.Container,
 | 
			
		||||
		Prefix:            params.Prefix,
 | 
			
		||||
		BulkDeleteSupport: detectBulkDelete(params.AuthURL),
 | 
			
		||||
		ChunkSize:         params.ChunkSize,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &Driver{
 | 
			
		||||
		baseEmbed: baseEmbed{
 | 
			
		||||
			Base: base.Base{
 | 
			
		||||
				StorageDriver: d,
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Implement the storagedriver.StorageDriver interface
 | 
			
		||||
 | 
			
		||||
func (d *driver) Name() string {
 | 
			
		||||
	return driverName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetContent retrieves the content stored at "path" as a []byte.
 | 
			
		||||
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
 | 
			
		||||
	content, err := d.Conn.ObjectGetBytes(d.Container, d.swiftPath(path))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, parseError(path, err)
 | 
			
		||||
	}
 | 
			
		||||
	return content, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PutContent stores the []byte content at a location designated by "path".
 | 
			
		||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
 | 
			
		||||
	if dir, err := d.createParentFolder(path); err != nil {
 | 
			
		||||
		return parseError(dir, err)
 | 
			
		||||
	}
 | 
			
		||||
	err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path),
 | 
			
		||||
	                             contents, d.getContentType())
 | 
			
		||||
	return parseError(path, err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
 | 
			
		||||
// given byte offset.
 | 
			
		||||
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
 | 
			
		||||
	headers := make(swift.Headers)
 | 
			
		||||
	headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-"
 | 
			
		||||
 | 
			
		||||
	file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 416 {
 | 
			
		||||
			return ioutil.NopCloser(bytes.NewReader(nil)), nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return nil, parseError(path, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return file, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteStream stores the contents of the provided io.Reader at a
 | 
			
		||||
// location designated by the given path. The driver will know it has
 | 
			
		||||
// received the full contents when the reader returns io.EOF. The number
 | 
			
		||||
// of successfully READ bytes will be returned, even if an error is
 | 
			
		||||
// returned. May be used to resume writing a stream by providing a nonzero
 | 
			
		||||
// offset. Offsets past the current size will write from the position
 | 
			
		||||
// beyond the end of the file.
 | 
			
		||||
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) {
 | 
			
		||||
	var (
 | 
			
		||||
		segments      []swift.Object
 | 
			
		||||
		paddingReader io.Reader
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	partNumber := int64(1)
 | 
			
		||||
	bytesRead := int64(0)
 | 
			
		||||
	currentLength := int64(0)
 | 
			
		||||
	zeroBuf := make([]byte, d.ChunkSize)
 | 
			
		||||
	segmentsContainer := d.Container + "_segments"
 | 
			
		||||
	cursor := int64(0)
 | 
			
		||||
 | 
			
		||||
	getSegment := func() string {
 | 
			
		||||
		return d.swiftPath(path) + "/" + fmt.Sprintf("%016d", partNumber)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	max := func(a int64, b int64) int64 {
 | 
			
		||||
		if a > b {
 | 
			
		||||
			return a
 | 
			
		||||
		}
 | 
			
		||||
		return b
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	info, _, err := d.Conn.Object(d.Container, d.swiftPath(path))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if swiftErr, ok := err.(*swift.Error); ok {
 | 
			
		||||
			if swiftErr.StatusCode == 404 {
 | 
			
		||||
				// Create a object manifest
 | 
			
		||||
				if dir, err := d.createParentFolder(path); err != nil {
 | 
			
		||||
					return bytesRead, parseError(dir, err)
 | 
			
		||||
				}
 | 
			
		||||
				headers := make(swift.Headers)
 | 
			
		||||
				headers["X-Object-Manifest"] = segmentsContainer + "/" + d.swiftPath(path)
 | 
			
		||||
				manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "",
 | 
			
		||||
				                                     d.getContentType(), headers)
 | 
			
		||||
				manifest.Close()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return bytesRead, parseError(path, err)
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				return bytesRead, parseError(path, err)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		// The manifest already exists. Get all the segments
 | 
			
		||||
		currentLength = info.Bytes
 | 
			
		||||
		headers := make(swift.Headers)
 | 
			
		||||
		headers["Content-Type"] = "application/json"
 | 
			
		||||
		opts := &swift.ObjectsOpts{Prefix: d.swiftPath(path), Headers: headers}
 | 
			
		||||
		segments, err = d.Conn.Objects(d.Container + "_segments", opts)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// First, we skip the existing segments that are not modified by this call
 | 
			
		||||
	for i := range segments {
 | 
			
		||||
		if offset < cursor + segments[i].Bytes {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		cursor += segments[i].Bytes
 | 
			
		||||
		partNumber++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// We reached the end of the file but we haven't reached 'offset' yet
 | 
			
		||||
	// Therefore we add blocks of zeros
 | 
			
		||||
	if offset >= currentLength {
 | 
			
		||||
		for offset - currentLength >= d.ChunkSize {
 | 
			
		||||
			// Insert a block a zero
 | 
			
		||||
			d.Conn.ObjectPut(segmentsContainer, getSegment(),
 | 
			
		||||
			                 bytes.NewReader(zeroBuf), false, "",
 | 
			
		||||
			                 d.getContentType(), nil)
 | 
			
		||||
			currentLength += d.ChunkSize
 | 
			
		||||
			partNumber++
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		cursor = currentLength
 | 
			
		||||
		paddingReader = bytes.NewReader(zeroBuf)
 | 
			
		||||
	} else {
 | 
			
		||||
		// Offset is inside the current segment : we need to read the
 | 
			
		||||
		// data from the beginning of the segment to offset
 | 
			
		||||
		paddingReader, _, err = d.Conn.ObjectOpen(segmentsContainer, getSegment(), false, nil)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(getSegment(), err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	multi := io.MultiReader(
 | 
			
		||||
		io.LimitReader(paddingReader, offset - cursor),
 | 
			
		||||
		io.LimitReader(reader, d.ChunkSize - (offset - cursor)),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		currentSegment, err := d.Conn.ObjectCreate(segmentsContainer, getSegment(), false, "", d.getContentType(), nil)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		n, err := io.Copy(currentSegment, multi)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if n < d.ChunkSize {
 | 
			
		||||
			// We wrote all the data
 | 
			
		||||
			if cursor + n < currentLength {
 | 
			
		||||
				// Copy the end of the chunk
 | 
			
		||||
				headers := make(swift.Headers)
 | 
			
		||||
				headers["Range"] = "bytes=" + strconv.FormatInt(cursor + n, 10) + "-" + strconv.FormatInt(cursor + d.ChunkSize, 10)
 | 
			
		||||
				file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return bytesRead, parseError(path, err)
 | 
			
		||||
				}
 | 
			
		||||
				io.Copy(currentSegment, file)
 | 
			
		||||
				file.Close()
 | 
			
		||||
			}
 | 
			
		||||
			if n > 0 {
 | 
			
		||||
				currentSegment.Close()
 | 
			
		||||
				bytesRead += n - max(0, offset - cursor)
 | 
			
		||||
			}
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		currentSegment.Close()
 | 
			
		||||
		bytesRead += n - max(0, offset - cursor)
 | 
			
		||||
		multi = io.MultiReader(io.LimitReader(reader, d.ChunkSize))
 | 
			
		||||
		cursor += d.ChunkSize
 | 
			
		||||
		partNumber++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return bytesRead, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Stat retrieves the FileInfo for the given path, including the current size
 | 
			
		||||
// in bytes and the creation time.
 | 
			
		||||
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
 | 
			
		||||
	info, _, err := d.Conn.Object(d.Container, d.swiftPath(path))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, parseError(path, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fi := storagedriver.FileInfoFields{
 | 
			
		||||
		Path:    path,
 | 
			
		||||
		IsDir:   info.ContentType == "application/directory",
 | 
			
		||||
		Size:    info.Bytes,
 | 
			
		||||
		ModTime: info.LastModified,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// List returns a list of the objects that are direct descendants of the given path.
 | 
			
		||||
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
 | 
			
		||||
	prefix := d.swiftPath(path)
 | 
			
		||||
	if prefix != "" {
 | 
			
		||||
		prefix += "/"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	opts := &swift.ObjectsOpts{
 | 
			
		||||
		Path:      prefix,
 | 
			
		||||
		Delimiter: '/',
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	files, err := d.Conn.ObjectNames(d.Container, opts)
 | 
			
		||||
	for index, name := range files {
 | 
			
		||||
		files[index] = "/" + strings.TrimSuffix(name, "/")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return files, parseError(path, err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Move moves an object stored at sourcePath to destPath, removing the original
 | 
			
		||||
// object.
 | 
			
		||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
 | 
			
		||||
	err := d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath),
 | 
			
		||||
	                         d.Container, d.swiftPath(destPath))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return parseError(sourcePath, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
 | 
			
		||||
func (d *driver) Delete(ctx context.Context, path string) error {
 | 
			
		||||
	opts := swift.ObjectsOpts{
 | 
			
		||||
		Prefix: d.swiftPath(path),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	objects, err := d.Conn.ObjectNamesAll(d.Container, &opts)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return parseError(path, err)
 | 
			
		||||
	}
 | 
			
		||||
	if len(objects) == 0 {
 | 
			
		||||
		return storagedriver.PathNotFoundError{Path: path}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for index, name := range objects {
 | 
			
		||||
		objects[index] = name[len(d.Prefix):]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var multiDelete = true
 | 
			
		||||
	if d.BulkDeleteSupport {
 | 
			
		||||
		_, err := d.Conn.BulkDelete(d.Container, objects)
 | 
			
		||||
		multiDelete = err != nil
 | 
			
		||||
	}
 | 
			
		||||
	if multiDelete {
 | 
			
		||||
		for _, name := range objects {
 | 
			
		||||
			if _, headers, err := d.Conn.Object(d.Container, name); err == nil {
 | 
			
		||||
				manifest, ok := headers["X-Object-Manifest"]
 | 
			
		||||
				if ok {
 | 
			
		||||
					components := strings.SplitN(manifest, "/", 2)
 | 
			
		||||
					segContainer := components[0]
 | 
			
		||||
					segments, err := d.Conn.ObjectNamesAll(segContainer, &swift.ObjectsOpts{ Prefix: components[1] })
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return parseError(name, err)
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					for _, s := range segments {
 | 
			
		||||
						if err := d.Conn.ObjectDelete(segContainer, s); err != nil {
 | 
			
		||||
							return parseError(s, err)
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				return parseError(name, err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if err := d.Conn.ObjectDelete(d.Container, name); err != nil {
 | 
			
		||||
				return parseError(name, err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
 | 
			
		||||
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
 | 
			
		||||
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
 | 
			
		||||
	return "", storagedriver.ErrUnsupportedMethod
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) swiftPath(path string) string {
 | 
			
		||||
	return strings.TrimLeft(strings.TrimRight(d.Prefix, "/")+path, "/")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) createParentFolder(path string) (string, error) {
 | 
			
		||||
	dir := gopath.Dir(path)
 | 
			
		||||
	if dir != "/" {
 | 
			
		||||
		_, _, err := d.Conn.Object(d.Container, d.swiftPath(dir))
 | 
			
		||||
		if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 {
 | 
			
		||||
			_, err := d.Conn.ObjectPut(d.Container, d.swiftPath(dir), bytes.NewReader(make([]byte, 0)),
 | 
			
		||||
			                           false, "", "application/directory", nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return dir, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return dir, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) getContentType() string {
 | 
			
		||||
	return "application/octet-stream"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func detectBulkDelete(authURL string) (bulkDelete bool) {
 | 
			
		||||
	resp, err := http.Get(filepath.Join(authURL, "..", "..") + "/info")
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		defer resp.Body.Close()
 | 
			
		||||
		decoder := json.NewDecoder(resp.Body)
 | 
			
		||||
		var infos swiftInfo
 | 
			
		||||
		if decoder.Decode(&infos) == nil {
 | 
			
		||||
			_, bulkDelete = infos["bulk_delete"]
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func parseError(path string, err error) error {
 | 
			
		||||
	if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 {
 | 
			
		||||
		return storagedriver.PathNotFoundError{Path: path}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,141 @@
 | 
			
		|||
package swift
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"os"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/lebauce/swift/swifttest"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/testsuites"
 | 
			
		||||
 | 
			
		||||
	"gopkg.in/check.v1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Hook up gocheck into the "go test" runner.
 | 
			
		||||
func Test(t *testing.T) { check.TestingT(t) }
 | 
			
		||||
 | 
			
		||||
type SwiftDriverConstructor func(rootDirectory string) (*Driver, error)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	var (
 | 
			
		||||
		username     string
 | 
			
		||||
		password     string
 | 
			
		||||
		authURL      string
 | 
			
		||||
		tenant       string
 | 
			
		||||
		container    string
 | 
			
		||||
		region       string
 | 
			
		||||
		prefix       string
 | 
			
		||||
		swiftServer *swifttest.SwiftServer
 | 
			
		||||
		err          error
 | 
			
		||||
	)
 | 
			
		||||
	if username = os.Getenv("OS_USERNAME"); username == "" {
 | 
			
		||||
		username = os.Getenv("ST_USER")
 | 
			
		||||
	}
 | 
			
		||||
	if password = os.Getenv("OS_PASSWORD"); password == "" {
 | 
			
		||||
		password = os.Getenv("ST_KEY")
 | 
			
		||||
	}
 | 
			
		||||
	if authURL = os.Getenv("OS_AUTH_URL"); authURL == "" {
 | 
			
		||||
		authURL = os.Getenv("ST_AUTH")
 | 
			
		||||
	}
 | 
			
		||||
	tenant = os.Getenv("OS_TENANT_NAME")
 | 
			
		||||
	container = os.Getenv("OS_CONTAINER_NAME")
 | 
			
		||||
	region = os.Getenv("OS_REGION_NAME")
 | 
			
		||||
	prefix = os.Getenv("OS_CONTAINER_PREFIX")
 | 
			
		||||
 | 
			
		||||
	if username == "" || password == "" || authURL == "" || container == "" {
 | 
			
		||||
		if swiftServer, err = swifttest.NewSwiftServer("localhost"); err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		username = "swifttest"
 | 
			
		||||
		password = "swifttest"
 | 
			
		||||
		authURL = swiftServer.AuthURL
 | 
			
		||||
		container = "test"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	root, err := ioutil.TempDir("", "driver-")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	defer os.Remove(root)
 | 
			
		||||
 | 
			
		||||
	swiftDriverConstructor := func(rootDirectory string) (*Driver, error) {
 | 
			
		||||
		parameters := DriverParameters{
 | 
			
		||||
			username,
 | 
			
		||||
			password,
 | 
			
		||||
			authURL,
 | 
			
		||||
			tenant,
 | 
			
		||||
			region,
 | 
			
		||||
			container,
 | 
			
		||||
			prefix,
 | 
			
		||||
			defaultChunkSize,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return New(parameters)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	skipCheck := func() string {
 | 
			
		||||
		return ""
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	driverConstructor := func() (storagedriver.StorageDriver, error) {
 | 
			
		||||
		return swiftDriverConstructor(root)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
 | 
			
		||||
 | 
			
		||||
	RegisterSwiftDriverSuite(swiftDriverConstructor, skipCheck, swiftServer)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterSwiftDriverSuite(swiftDriverConstructor SwiftDriverConstructor, skipCheck testsuites.SkipCheck,
 | 
			
		||||
	swiftServer *swifttest.SwiftServer) {
 | 
			
		||||
	check.Suite(&SwiftDriverSuite{
 | 
			
		||||
		Constructor: swiftDriverConstructor,
 | 
			
		||||
		SkipCheck:   skipCheck,
 | 
			
		||||
		SwiftServer: swiftServer,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type SwiftDriverSuite struct {
 | 
			
		||||
	Constructor SwiftDriverConstructor
 | 
			
		||||
	SwiftServer *swifttest.SwiftServer
 | 
			
		||||
	testsuites.SkipCheck
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (suite *SwiftDriverSuite) SetUpSuite(c *check.C) {
 | 
			
		||||
	if reason := suite.SkipCheck(); reason != "" {
 | 
			
		||||
		c.Skip(reason)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (suite *SwiftDriverSuite) TestEmptyRootList(c *check.C) {
 | 
			
		||||
	validRoot, err := ioutil.TempDir("", "driver-")
 | 
			
		||||
	c.Assert(err, check.IsNil)
 | 
			
		||||
	defer os.Remove(validRoot)
 | 
			
		||||
 | 
			
		||||
	rootedDriver, err := suite.Constructor(validRoot)
 | 
			
		||||
	c.Assert(err, check.IsNil)
 | 
			
		||||
	emptyRootDriver, err := suite.Constructor("")
 | 
			
		||||
	c.Assert(err, check.IsNil)
 | 
			
		||||
	slashRootDriver, err := suite.Constructor("/")
 | 
			
		||||
	c.Assert(err, check.IsNil)
 | 
			
		||||
 | 
			
		||||
	filename := "/test"
 | 
			
		||||
	contents := []byte("contents")
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	err = rootedDriver.PutContent(ctx, filename, contents)
 | 
			
		||||
	c.Assert(err, check.IsNil)
 | 
			
		||||
	defer rootedDriver.Delete(ctx, filename)
 | 
			
		||||
 | 
			
		||||
	keys, err := emptyRootDriver.List(ctx, "/")
 | 
			
		||||
	for _, path := range keys {
 | 
			
		||||
		c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	keys, err = slashRootDriver.List(ctx, "/")
 | 
			
		||||
	for _, path := range keys {
 | 
			
		||||
		c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue