Use only one Swift container for both files and manifests
Signed-off-by: Sylvain Baubeau <sbaubeau@redhat.com>master
							parent
							
								
									a1ae7f7122
								
							
						
					
					
						commit
						9ab55eae39
					
				| 
						 | 
				
			
			@ -156,10 +156,6 @@ func New(params DriverParameters) (*Driver, error) {
 | 
			
		|||
		return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := ct.ContainerCreate(params.Container+"_segments", nil); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container+"_segments", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	d := &driver{
 | 
			
		||||
		Conn:              ct,
 | 
			
		||||
		Container:         params.Container,
 | 
			
		||||
| 
						 | 
				
			
			@ -231,8 +227,8 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.
 | 
			
		|||
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) {
 | 
			
		||||
	var (
 | 
			
		||||
		segments      []swift.Object
 | 
			
		||||
		multi         io.Reader
 | 
			
		||||
		paddingReader io.Reader
 | 
			
		||||
		bytesRead     int64
 | 
			
		||||
		currentLength int64
 | 
			
		||||
		cursor        int64
 | 
			
		||||
	)
 | 
			
		||||
| 
						 | 
				
			
			@ -240,10 +236,9 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
	partNumber := 1
 | 
			
		||||
	chunkSize := int64(d.ChunkSize)
 | 
			
		||||
	zeroBuf := make([]byte, d.ChunkSize)
 | 
			
		||||
	segmentsContainer := d.getSegmentsContainer()
 | 
			
		||||
 | 
			
		||||
	getSegment := func() string {
 | 
			
		||||
		return d.swiftPath(path) + "/" + fmt.Sprintf("%016d", partNumber)
 | 
			
		||||
		return d.swiftSegmentPath(path) + "/" + fmt.Sprintf("%016d", partNumber)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	max := func(a int64, b int64) int64 {
 | 
			
		||||
| 
						 | 
				
			
			@ -258,22 +253,22 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
		if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 {
 | 
			
		||||
			// Create a object manifest
 | 
			
		||||
			if err := d.createParentFolders(path); err != nil {
 | 
			
		||||
				return bytesRead, err
 | 
			
		||||
				return 0, err
 | 
			
		||||
			}
 | 
			
		||||
			manifest, err := d.createManifest(path)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return bytesRead, parseError(path, err)
 | 
			
		||||
				return 0, parseError(path, err)
 | 
			
		||||
			}
 | 
			
		||||
			manifest.Close()
 | 
			
		||||
		} else {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
			return 0, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		// The manifest already exists. Get all the segments
 | 
			
		||||
		currentLength = info.Bytes
 | 
			
		||||
		segments, err = d.getAllSegments(segmentsContainer, path)
 | 
			
		||||
		segments, err = d.getAllSegments(path)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
			return 0, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -291,7 +286,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
	if offset >= currentLength {
 | 
			
		||||
		for offset-currentLength >= chunkSize {
 | 
			
		||||
			// Insert a block a zero
 | 
			
		||||
			d.Conn.ObjectPut(segmentsContainer, getSegment(),
 | 
			
		||||
			d.Conn.ObjectPut(d.Container, getSegment(),
 | 
			
		||||
				bytes.NewReader(zeroBuf), false, "",
 | 
			
		||||
				d.getContentType(), nil)
 | 
			
		||||
			currentLength += chunkSize
 | 
			
		||||
| 
						 | 
				
			
			@ -303,26 +298,34 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
	} else {
 | 
			
		||||
		// Offset is inside the current segment : we need to read the
 | 
			
		||||
		// data from the beginning of the segment to offset
 | 
			
		||||
		paddingReader, _, err = d.Conn.ObjectOpen(segmentsContainer, getSegment(), false, nil)
 | 
			
		||||
		file, _, err := d.Conn.ObjectOpen(d.Container, getSegment(), false, nil)
 | 
			
		||||
		defer file.Close()
 | 
			
		||||
		paddingReader = file
 | 
			
		||||
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(getSegment(), err)
 | 
			
		||||
			return 0, parseError(getSegment(), err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	multi := io.MultiReader(
 | 
			
		||||
	multi = io.MultiReader(
 | 
			
		||||
		io.LimitReader(paddingReader, offset-cursor),
 | 
			
		||||
		io.LimitReader(reader, chunkSize-(offset-cursor)),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		currentSegment, err := d.Conn.ObjectCreate(segmentsContainer, getSegment(), false, "", d.getContentType(), nil)
 | 
			
		||||
	writeSegment := func(segment string) (finished bool, bytesRead int64, err error) {
 | 
			
		||||
		currentSegment, err := d.Conn.ObjectCreate(d.Container, segment, false, "", d.getContentType(), nil)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
			return false, bytesRead, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		n, err := io.Copy(currentSegment, multi)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, parseError(path, err)
 | 
			
		||||
			return false, bytesRead, parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if n > 0 {
 | 
			
		||||
			defer currentSegment.Close()
 | 
			
		||||
			bytesRead += n - max(0, offset-cursor)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if n < chunkSize {
 | 
			
		||||
| 
						 | 
				
			
			@ -333,25 +336,39 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
				headers["Range"] = "bytes=" + strconv.FormatInt(cursor+n, 10) + "-" + strconv.FormatInt(cursor+chunkSize, 10)
 | 
			
		||||
				file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return bytesRead, parseError(path, err)
 | 
			
		||||
					return false, bytesRead, parseError(path, err)
 | 
			
		||||
				}
 | 
			
		||||
				if _, err := io.Copy(currentSegment, file); err != nil {
 | 
			
		||||
					return bytesRead, parseError(path, err)
 | 
			
		||||
 | 
			
		||||
				_, copyErr := io.Copy(currentSegment, file)
 | 
			
		||||
 | 
			
		||||
				if err := file.Close(); err != nil {
 | 
			
		||||
					return false, bytesRead, parseError(path, err)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if copyErr != nil {
 | 
			
		||||
					return false, bytesRead, parseError(path, copyErr)
 | 
			
		||||
				}
 | 
			
		||||
				file.Close()
 | 
			
		||||
			}
 | 
			
		||||
			if n > 0 {
 | 
			
		||||
				currentSegment.Close()
 | 
			
		||||
				bytesRead += n - max(0, offset-cursor)
 | 
			
		||||
			}
 | 
			
		||||
			break
 | 
			
		||||
 | 
			
		||||
			return true, bytesRead, nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		currentSegment.Close()
 | 
			
		||||
		bytesRead += n - max(0, offset-cursor)
 | 
			
		||||
		multi = io.MultiReader(io.LimitReader(reader, chunkSize))
 | 
			
		||||
		multi = io.LimitReader(reader, chunkSize)
 | 
			
		||||
		cursor += chunkSize
 | 
			
		||||
		partNumber++
 | 
			
		||||
 | 
			
		||||
		return false, bytesRead, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	finished := false
 | 
			
		||||
	read := int64(0)
 | 
			
		||||
	bytesRead := int64(0)
 | 
			
		||||
	for finished == false {
 | 
			
		||||
		finished, read, err = writeSegment(getSegment())
 | 
			
		||||
		bytesRead += read
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return bytesRead, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return bytesRead, nil
 | 
			
		||||
| 
						 | 
				
			
			@ -392,7 +409,7 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
 | 
			
		|||
	objects, err := d.Conn.Objects(d.Container, opts)
 | 
			
		||||
	for _, obj := range objects {
 | 
			
		||||
		if !obj.PseudoDirectory {
 | 
			
		||||
			files = append(files, "/"+strings.TrimSuffix(obj.Name, "/"))
 | 
			
		||||
			files = append(files, strings.TrimPrefix(strings.TrimSuffix(obj.Name, "/"), d.swiftPath("/")))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -425,40 +442,35 @@ func (d *driver) Delete(ctx context.Context, path string) error {
 | 
			
		|||
		return storagedriver.PathNotFoundError{Path: path}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for index, name := range objects {
 | 
			
		||||
		objects[index] = name[len(d.Prefix):]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var multiDelete = true
 | 
			
		||||
	if d.BulkDeleteSupport {
 | 
			
		||||
		_, err := d.Conn.BulkDelete(d.Container, objects)
 | 
			
		||||
		multiDelete = err != nil
 | 
			
		||||
		if _, err := d.Conn.BulkDelete(d.Container, objects); err != swift.Forbidden {
 | 
			
		||||
			return parseError(path, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if multiDelete {
 | 
			
		||||
		for _, name := range objects {
 | 
			
		||||
			if _, headers, err := d.Conn.Object(d.Container, name); err == nil {
 | 
			
		||||
				manifest, ok := headers["X-Object-Manifest"]
 | 
			
		||||
				if ok {
 | 
			
		||||
					components := strings.SplitN(manifest, "/", 2)
 | 
			
		||||
					segContainer := components[0]
 | 
			
		||||
					segments, err := d.getAllSegments(segContainer, components[1])
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return parseError(name, err)
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					for _, s := range segments {
 | 
			
		||||
						if err := d.Conn.ObjectDelete(segContainer, s.Name); err != nil {
 | 
			
		||||
							return parseError(s.Name, err)
 | 
			
		||||
						}
 | 
			
		||||
	for _, name := range objects {
 | 
			
		||||
		if _, headers, err := d.Conn.Object(d.Container, name); err == nil {
 | 
			
		||||
			manifest, ok := headers["X-Object-Manifest"]
 | 
			
		||||
			if ok {
 | 
			
		||||
				components := strings.SplitN(manifest, "/", 2)
 | 
			
		||||
				segContainer := components[0]
 | 
			
		||||
				segments, err := d.getAllSegments(components[1])
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return parseError(name, err)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				for _, s := range segments {
 | 
			
		||||
					if err := d.Conn.ObjectDelete(segContainer, s.Name); err != nil {
 | 
			
		||||
						return parseError(s.Name, err)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				return parseError(name, err)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			return parseError(name, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
			if err := d.Conn.ObjectDelete(d.Container, name); err != nil {
 | 
			
		||||
				return parseError(name, err)
 | 
			
		||||
			}
 | 
			
		||||
		if err := d.Conn.ObjectDelete(d.Container, name); err != nil {
 | 
			
		||||
			return parseError(name, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -472,14 +484,18 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) swiftPath(path string) string {
 | 
			
		||||
	return strings.TrimLeft(strings.TrimRight(d.Prefix, "/")+path, "/")
 | 
			
		||||
	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) swiftSegmentPath(path string) string {
 | 
			
		||||
	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments"+path, "/"), "/")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) createParentFolders(path string) error {
 | 
			
		||||
	dir := gopath.Dir(path)
 | 
			
		||||
	for dir != "/" {
 | 
			
		||||
		_, _, err := d.Conn.Object(d.Container, d.swiftPath(dir))
 | 
			
		||||
		if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 {
 | 
			
		||||
		if err == swift.ContainerNotFound || err == swift.ObjectNotFound {
 | 
			
		||||
			_, err := d.Conn.ObjectPut(d.Container, d.swiftPath(dir), bytes.NewReader(make([]byte, 0)),
 | 
			
		||||
				false, "", directoryMimeType, nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -496,17 +512,13 @@ func (d *driver) getContentType() string {
 | 
			
		|||
	return "application/octet-stream"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) getSegmentsContainer() string {
 | 
			
		||||
	return d.Container + "_segments"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) getAllSegments(container string, path string) ([]swift.Object, error) {
 | 
			
		||||
	return d.Conn.Objects(container, &swift.ObjectsOpts{Prefix: d.swiftPath(path)})
 | 
			
		||||
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
 | 
			
		||||
	return d.Conn.Objects(d.Container, &swift.ObjectsOpts{Prefix: d.swiftSegmentPath(path)})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) createManifest(path string) (*swift.ObjectCreateFile, error) {
 | 
			
		||||
	headers := make(swift.Headers)
 | 
			
		||||
	headers["X-Object-Manifest"] = d.getSegmentsContainer() + "/" + d.swiftPath(path)
 | 
			
		||||
	headers["X-Object-Manifest"] = d.Container + "/" + d.swiftSegmentPath(path)
 | 
			
		||||
	return d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "",
 | 
			
		||||
		d.getContentType(), headers)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue