Updates Swift driver to support new storagedriver.FileWriter interface
Signed-off-by: Brian Bland <brian.bland@docker.com>master
							parent
							
								
									a9bf7a2aae
								
							
						
					
					
						commit
						7fd1db9312
					
				|  | @ -16,7 +16,7 @@ import ( | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/oss" | 	_ "github.com/docker/distribution/registry/storage/driver/oss" | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/s3-aws" | 	_ "github.com/docker/distribution/registry/storage/driver/s3-aws" | ||||||
| 	_ "github.com/docker/distribution/registry/storage/driver/s3-goamz" | 	_ "github.com/docker/distribution/registry/storage/driver/s3-goamz" | ||||||
| 	// _ "github.com/docker/distribution/registry/storage/driver/swift"
 | 	_ "github.com/docker/distribution/registry/storage/driver/swift" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func main() { | func main() { | ||||||
|  |  | ||||||
|  | @ -16,8 +16,8 @@ | ||||||
| package swift | package swift | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"bufio" | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"crypto/md5" |  | ||||||
| 	"crypto/rand" | 	"crypto/rand" | ||||||
| 	"crypto/sha1" | 	"crypto/sha1" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
|  | @ -49,6 +49,9 @@ const defaultChunkSize = 20 * 1024 * 1024 | ||||||
| // minChunkSize defines the minimum size of a segment
 | // minChunkSize defines the minimum size of a segment
 | ||||||
| const minChunkSize = 1 << 20 | const minChunkSize = 1 << 20 | ||||||
| 
 | 
 | ||||||
|  | // contentType defines the Content-Type header associated with stored segments
 | ||||||
|  | const contentType = "application/octet-stream" | ||||||
|  | 
 | ||||||
| // readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
 | // readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
 | ||||||
| var readAfterWriteTimeout = 15 * time.Second | var readAfterWriteTimeout = 15 * time.Second | ||||||
| 
 | 
 | ||||||
|  | @ -282,16 +285,16 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { | ||||||
| 
 | 
 | ||||||
| // PutContent stores the []byte content at a location designated by "path".
 | // PutContent stores the []byte content at a location designated by "path".
 | ||||||
| func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { | func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { | ||||||
| 	err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, d.getContentType()) | 	err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, contentType) | ||||||
| 	if err == swift.ObjectNotFound { | 	if err == swift.ObjectNotFound { | ||||||
| 		return storagedriver.PathNotFoundError{Path: path} | 		return storagedriver.PathNotFoundError{Path: path} | ||||||
| 	} | 	} | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
 | // Reader retrieves an io.ReadCloser for the content stored at "path" with a
 | ||||||
| // given byte offset.
 | // given byte offset.
 | ||||||
| func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { | func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { | ||||||
| 	headers := make(swift.Headers) | 	headers := make(swift.Headers) | ||||||
| 	headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-" | 	headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-" | ||||||
| 
 | 
 | ||||||
|  | @ -305,224 +308,46 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. | ||||||
| 	return file, err | 	return file, err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // WriteStream stores the contents of the provided io.Reader at a
 | // Writer returns a FileWriter which will store the content written to it
 | ||||||
| // location designated by the given path. The driver will know it has
 | // at the location designated by "path" after the call to Commit.
 | ||||||
| // received the full contents when the reader returns io.EOF. The number
 | func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { | ||||||
| // of successfully READ bytes will be returned, even if an error is
 |  | ||||||
| // returned. May be used to resume writing a stream by providing a nonzero
 |  | ||||||
| // offset. Offsets past the current size will write from the position
 |  | ||||||
| // beyond the end of the file.
 |  | ||||||
| func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) { |  | ||||||
| 	var ( | 	var ( | ||||||
| 		segments     []swift.Object | 		segments     []swift.Object | ||||||
| 		multi         io.Reader | 		segmentsPath string | ||||||
| 		paddingReader io.Reader | 		err          error | ||||||
| 		currentLength int64 |  | ||||||
| 		cursor        int64 |  | ||||||
| 		segmentPath   string |  | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	partNumber := 1 | 	if !append { | ||||||
| 	chunkSize := int64(d.ChunkSize) | 		segmentsPath, err = d.swiftSegmentPath(path) | ||||||
| 	zeroBuf := make([]byte, d.ChunkSize) | 		if err != nil { | ||||||
| 	hash := md5.New() | 			return nil, err | ||||||
| 
 |  | ||||||
| 	getSegment := func() string { |  | ||||||
| 		return fmt.Sprintf("%s/%016d", segmentPath, partNumber) |  | ||||||
| 		} | 		} | ||||||
| 
 | 	} else { | ||||||
| 	max := func(a int64, b int64) int64 { |  | ||||||
| 		if a > b { |  | ||||||
| 			return a |  | ||||||
| 		} |  | ||||||
| 		return b |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	createManifest := true |  | ||||||
| 		info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path)) | 		info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path)) | ||||||
| 	if err == nil { | 		if err == swift.ObjectNotFound { | ||||||
|  | 			return nil, storagedriver.PathNotFoundError{Path: path} | ||||||
|  | 		} else if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
| 		manifest, ok := headers["X-Object-Manifest"] | 		manifest, ok := headers["X-Object-Manifest"] | ||||||
| 		if !ok { | 		if !ok { | ||||||
| 			if segmentPath, err = d.swiftSegmentPath(path); err != nil { | 			segmentsPath, err = d.swiftSegmentPath(path) | ||||||
| 				return 0, err | 			if err != nil { | ||||||
|  | 				return nil, err | ||||||
| 			} | 			} | ||||||
| 			if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegment()); err != nil { | 			if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, segmentPath(segmentsPath, len(segments))); err != nil { | ||||||
| 				return 0, err | 				return nil, err | ||||||
| 			} | 			} | ||||||
| 			segments = append(segments, info) | 			segments = []swift.Object{info} | ||||||
| 		} else { | 		} else { | ||||||
| 			_, segmentPath = parseManifest(manifest) | 			_, segmentsPath = parseManifest(manifest) | ||||||
| 			if segments, err = d.getAllSegments(segmentPath); err != nil { | 			if segments, err = d.getAllSegments(segmentsPath); err != nil { | ||||||
| 				return 0, err | 				return nil, err | ||||||
| 			} | 			} | ||||||
| 			createManifest = false |  | ||||||
| 		} |  | ||||||
| 		currentLength = info.Bytes |  | ||||||
| 	} else if err == swift.ObjectNotFound { |  | ||||||
| 		if segmentPath, err = d.swiftSegmentPath(path); err != nil { |  | ||||||
| 			return 0, err |  | ||||||
| 		} |  | ||||||
| 	} else { |  | ||||||
| 		return 0, err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// First, we skip the existing segments that are not modified by this call
 |  | ||||||
| 	for i := range segments { |  | ||||||
| 		if offset < cursor+segments[i].Bytes { |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 		cursor += segments[i].Bytes |  | ||||||
| 		hash.Write([]byte(segments[i].Hash)) |  | ||||||
| 		partNumber++ |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// We reached the end of the file but we haven't reached 'offset' yet
 |  | ||||||
| 	// Therefore we add blocks of zeros
 |  | ||||||
| 	if offset >= currentLength { |  | ||||||
| 		for offset-currentLength >= chunkSize { |  | ||||||
| 			// Insert a block a zero
 |  | ||||||
| 			headers, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) |  | ||||||
| 			if err != nil { |  | ||||||
| 				if err == swift.ObjectNotFound { |  | ||||||
| 					return 0, storagedriver.PathNotFoundError{Path: getSegment()} |  | ||||||
| 				} |  | ||||||
| 				return 0, err |  | ||||||
| 			} |  | ||||||
| 			currentLength += chunkSize |  | ||||||
| 			partNumber++ |  | ||||||
| 			hash.Write([]byte(headers["Etag"])) |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		cursor = currentLength |  | ||||||
| 		paddingReader = bytes.NewReader(zeroBuf) |  | ||||||
| 	} else if offset-cursor > 0 { |  | ||||||
| 		// Offset is inside the current segment : we need to read the
 |  | ||||||
| 		// data from the beginning of the segment to offset
 |  | ||||||
| 		file, _, err := d.Conn.ObjectOpen(d.Container, getSegment(), false, nil) |  | ||||||
| 		if err != nil { |  | ||||||
| 			if err == swift.ObjectNotFound { |  | ||||||
| 				return 0, storagedriver.PathNotFoundError{Path: getSegment()} |  | ||||||
| 			} |  | ||||||
| 			return 0, err |  | ||||||
| 		} |  | ||||||
| 		defer file.Close() |  | ||||||
| 		paddingReader = file |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	readers := []io.Reader{} |  | ||||||
| 	if paddingReader != nil { |  | ||||||
| 		readers = append(readers, io.LimitReader(paddingReader, offset-cursor)) |  | ||||||
| 	} |  | ||||||
| 	readers = append(readers, io.LimitReader(reader, chunkSize-(offset-cursor))) |  | ||||||
| 	multi = io.MultiReader(readers...) |  | ||||||
| 
 |  | ||||||
| 	writeSegment := func(segment string) (finished bool, bytesRead int64, err error) { |  | ||||||
| 		currentSegment, err := d.Conn.ObjectCreate(d.Container, segment, false, "", d.getContentType(), nil) |  | ||||||
| 		if err != nil { |  | ||||||
| 			if err == swift.ObjectNotFound { |  | ||||||
| 				return false, bytesRead, storagedriver.PathNotFoundError{Path: segment} |  | ||||||
| 			} |  | ||||||
| 			return false, bytesRead, err |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		segmentHash := md5.New() |  | ||||||
| 		writer := io.MultiWriter(currentSegment, segmentHash) |  | ||||||
| 
 |  | ||||||
| 		n, err := io.Copy(writer, multi) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return false, bytesRead, err |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if n > 0 { |  | ||||||
| 			defer func() { |  | ||||||
| 				closeError := currentSegment.Close() |  | ||||||
| 				if err != nil { |  | ||||||
| 					err = closeError |  | ||||||
| 				} |  | ||||||
| 				hexHash := hex.EncodeToString(segmentHash.Sum(nil)) |  | ||||||
| 				hash.Write([]byte(hexHash)) |  | ||||||
| 			}() |  | ||||||
| 			bytesRead += n - max(0, offset-cursor) |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if n < chunkSize { |  | ||||||
| 			// We wrote all the data
 |  | ||||||
| 			if cursor+n < currentLength { |  | ||||||
| 				// Copy the end of the chunk
 |  | ||||||
| 				headers := make(swift.Headers) |  | ||||||
| 				headers["Range"] = "bytes=" + strconv.FormatInt(cursor+n, 10) + "-" + strconv.FormatInt(cursor+chunkSize, 10) |  | ||||||
| 				file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers) |  | ||||||
| 				if err != nil { |  | ||||||
| 					if err == swift.ObjectNotFound { |  | ||||||
| 						return false, bytesRead, storagedriver.PathNotFoundError{Path: path} |  | ||||||
| 					} |  | ||||||
| 					return false, bytesRead, err |  | ||||||
| 				} |  | ||||||
| 
 |  | ||||||
| 				_, copyErr := io.Copy(writer, file) |  | ||||||
| 
 |  | ||||||
| 				if err := file.Close(); err != nil { |  | ||||||
| 					if err == swift.ObjectNotFound { |  | ||||||
| 						return false, bytesRead, storagedriver.PathNotFoundError{Path: path} |  | ||||||
| 					} |  | ||||||
| 					return false, bytesRead, err |  | ||||||
| 				} |  | ||||||
| 
 |  | ||||||
| 				if copyErr != nil { |  | ||||||
| 					return false, bytesRead, copyErr |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 			return true, bytesRead, nil | 	return d.newWriter(path, segmentsPath, segments), nil | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		multi = io.LimitReader(reader, chunkSize) |  | ||||||
| 		cursor += chunkSize |  | ||||||
| 		partNumber++ |  | ||||||
| 
 |  | ||||||
| 		return false, bytesRead, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	finished := false |  | ||||||
| 	read := int64(0) |  | ||||||
| 	bytesRead := int64(0) |  | ||||||
| 	for finished == false { |  | ||||||
| 		finished, read, err = writeSegment(getSegment()) |  | ||||||
| 		bytesRead += read |  | ||||||
| 		if err != nil { |  | ||||||
| 			return bytesRead, err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for ; partNumber < len(segments); partNumber++ { |  | ||||||
| 		hash.Write([]byte(segments[partNumber].Hash)) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if createManifest { |  | ||||||
| 		if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil { |  | ||||||
| 			return 0, err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	expectedHash := hex.EncodeToString(hash.Sum(nil)) |  | ||||||
| 	waitingTime := readAfterWriteWait |  | ||||||
| 	endTime := time.Now().Add(readAfterWriteTimeout) |  | ||||||
| 	for { |  | ||||||
| 		var infos swift.Object |  | ||||||
| 		if infos, _, err = d.Conn.Object(d.Container, d.swiftPath(path)); err == nil { |  | ||||||
| 			if strings.Trim(infos.Hash, "\"") == expectedHash { |  | ||||||
| 				return bytesRead, nil |  | ||||||
| 			} |  | ||||||
| 			err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", path) |  | ||||||
| 		} |  | ||||||
| 		if time.Now().Add(waitingTime).After(endTime) { |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 		time.Sleep(waitingTime) |  | ||||||
| 		waitingTime *= 2 |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return bytesRead, err |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Stat retrieves the FileInfo for the given path, including the current size
 | // Stat retrieves the FileInfo for the given path, including the current size
 | ||||||
|  | @ -763,10 +588,6 @@ func (d *driver) swiftSegmentPath(path string) (string, error) { | ||||||
| 	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil | 	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (d *driver) getContentType() string { |  | ||||||
| 	return "application/octet-stream" |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (d *driver) getAllSegments(path string) ([]swift.Object, error) { | func (d *driver) getAllSegments(path string) ([]swift.Object, error) { | ||||||
| 	segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path}) | 	segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path}) | ||||||
| 	if err == swift.ContainerNotFound { | 	if err == swift.ContainerNotFound { | ||||||
|  | @ -778,7 +599,7 @@ func (d *driver) getAllSegments(path string) ([]swift.Object, error) { | ||||||
| func (d *driver) createManifest(path string, segments string) error { | func (d *driver) createManifest(path string, segments string) error { | ||||||
| 	headers := make(swift.Headers) | 	headers := make(swift.Headers) | ||||||
| 	headers["X-Object-Manifest"] = segments | 	headers["X-Object-Manifest"] = segments | ||||||
| 	manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", d.getContentType(), headers) | 	manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", contentType, headers) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		if err == swift.ObjectNotFound { | 		if err == swift.ObjectNotFound { | ||||||
| 			return storagedriver.PathNotFoundError{Path: path} | 			return storagedriver.PathNotFoundError{Path: path} | ||||||
|  | @ -810,3 +631,152 @@ func generateSecret() (string, error) { | ||||||
| 	} | 	} | ||||||
| 	return hex.EncodeToString(secretBytes[:]), nil | 	return hex.EncodeToString(secretBytes[:]), nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func segmentPath(segmentsPath string, partNumber int) string { | ||||||
|  | 	return fmt.Sprintf("%s/%016d", segmentsPath, partNumber) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type writer struct { | ||||||
|  | 	driver       *driver | ||||||
|  | 	path         string | ||||||
|  | 	segmentsPath string | ||||||
|  | 	size         int64 | ||||||
|  | 	bw           *bufio.Writer | ||||||
|  | 	closed       bool | ||||||
|  | 	committed    bool | ||||||
|  | 	cancelled    bool | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (d *driver) newWriter(path, segmentsPath string, segments []swift.Object) storagedriver.FileWriter { | ||||||
|  | 	var size int64 | ||||||
|  | 	for _, segment := range segments { | ||||||
|  | 		size += segment.Bytes | ||||||
|  | 	} | ||||||
|  | 	return &writer{ | ||||||
|  | 		driver:       d, | ||||||
|  | 		path:         path, | ||||||
|  | 		segmentsPath: segmentsPath, | ||||||
|  | 		size:         size, | ||||||
|  | 		bw: bufio.NewWriterSize(&segmentWriter{ | ||||||
|  | 			conn:          d.Conn, | ||||||
|  | 			container:     d.Container, | ||||||
|  | 			segmentsPath:  segmentsPath, | ||||||
|  | 			segmentNumber: len(segments) + 1, | ||||||
|  | 			maxChunkSize:  d.ChunkSize, | ||||||
|  | 		}, d.ChunkSize), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w *writer) Write(p []byte) (int, error) { | ||||||
|  | 	if w.closed { | ||||||
|  | 		return 0, fmt.Errorf("already closed") | ||||||
|  | 	} else if w.committed { | ||||||
|  | 		return 0, fmt.Errorf("already committed") | ||||||
|  | 	} else if w.cancelled { | ||||||
|  | 		return 0, fmt.Errorf("already cancelled") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	n, err := w.bw.Write(p) | ||||||
|  | 	w.size += int64(n) | ||||||
|  | 	return n, err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w *writer) Size() int64 { | ||||||
|  | 	return w.size | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w *writer) Close() error { | ||||||
|  | 	if w.closed { | ||||||
|  | 		return fmt.Errorf("already closed") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := w.bw.Flush(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if !w.committed && !w.cancelled { | ||||||
|  | 		if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	w.closed = true | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w *writer) Cancel() error { | ||||||
|  | 	if w.closed { | ||||||
|  | 		return fmt.Errorf("already closed") | ||||||
|  | 	} else if w.committed { | ||||||
|  | 		return fmt.Errorf("already committed") | ||||||
|  | 	} | ||||||
|  | 	w.cancelled = true | ||||||
|  | 	return w.driver.Delete(context.Background(), w.path) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w *writer) Commit() error { | ||||||
|  | 	if w.closed { | ||||||
|  | 		return fmt.Errorf("already closed") | ||||||
|  | 	} else if w.committed { | ||||||
|  | 		return fmt.Errorf("already committed") | ||||||
|  | 	} else if w.cancelled { | ||||||
|  | 		return fmt.Errorf("already cancelled") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := w.bw.Flush(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	w.committed = true | ||||||
|  | 
 | ||||||
|  | 	var err error | ||||||
|  | 	waitingTime := readAfterWriteWait | ||||||
|  | 	endTime := time.Now().Add(readAfterWriteTimeout) | ||||||
|  | 	for { | ||||||
|  | 		var info swift.Object | ||||||
|  | 		if info, _, err = w.driver.Conn.Object(w.driver.Container, w.driver.swiftPath(w.path)); err == nil { | ||||||
|  | 			if info.Bytes == w.size { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 			err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", w.path) | ||||||
|  | 		} | ||||||
|  | 		if time.Now().Add(waitingTime).After(endTime) { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 		time.Sleep(waitingTime) | ||||||
|  | 		waitingTime *= 2 | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type segmentWriter struct { | ||||||
|  | 	conn          swift.Connection | ||||||
|  | 	container     string | ||||||
|  | 	segmentsPath  string | ||||||
|  | 	segmentNumber int | ||||||
|  | 	maxChunkSize  int | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (sw *segmentWriter) Write(p []byte) (int, error) { | ||||||
|  | 	n := 0 | ||||||
|  | 	for offset := 0; offset < len(p); offset += sw.maxChunkSize { | ||||||
|  | 		chunkSize := sw.maxChunkSize | ||||||
|  | 		if offset+chunkSize > len(p) { | ||||||
|  | 			chunkSize = len(p) - offset | ||||||
|  | 		} | ||||||
|  | 		_, err := sw.conn.ObjectPut(sw.container, segmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return n, err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		sw.segmentNumber++ | ||||||
|  | 		n += chunkSize | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return n, nil | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue