Ensure read after write for segments
Signed-off-by: Sylvain Baubeau <sbaubeau@redhat.com>master
							parent
							
								
									0737bb7175
								
							
						
					
					
						commit
						3ff8af326b
					
				|  | @ -20,6 +20,7 @@ package swift | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
|  | 	"crypto/md5" | ||||||
| 	"crypto/rand" | 	"crypto/rand" | ||||||
| 	"crypto/sha1" | 	"crypto/sha1" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
|  | @ -52,6 +53,12 @@ const defaultChunkSize = 20 * 1024 * 1024 | ||||||
| // minChunkSize defines the minimum size of a segment
 | // minChunkSize defines the minimum size of a segment
 | ||||||
| const minChunkSize = 1 << 20 | const minChunkSize = 1 << 20 | ||||||
| 
 | 
 | ||||||
|  | // readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
 | ||||||
|  | var readAfterWriteTimeout = 15 * time.Second | ||||||
|  | 
 | ||||||
|  | // readAfterWriteWait defines the time to sleep between two retries
 | ||||||
|  | var readAfterWriteWait = 200 * time.Millisecond | ||||||
|  | 
 | ||||||
| // Parameters A struct that encapsulates all of the driver parameters after all values have been set
 | // Parameters A struct that encapsulates all of the driver parameters after all values have been set
 | ||||||
| type Parameters struct { | type Parameters struct { | ||||||
| 	Username           string | 	Username           string | ||||||
|  | @ -252,6 +259,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea | ||||||
| 	partNumber := 1 | 	partNumber := 1 | ||||||
| 	chunkSize := int64(d.ChunkSize) | 	chunkSize := int64(d.ChunkSize) | ||||||
| 	zeroBuf := make([]byte, d.ChunkSize) | 	zeroBuf := make([]byte, d.ChunkSize) | ||||||
|  | 	hash := md5.New() | ||||||
| 
 | 
 | ||||||
| 	getSegment := func() string { | 	getSegment := func() string { | ||||||
| 		return fmt.Sprintf("%s/%016d", segmentPath, partNumber) | 		return fmt.Sprintf("%s/%016d", segmentPath, partNumber) | ||||||
|  | @ -292,18 +300,13 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea | ||||||
| 		return 0, err | 		return 0, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if createManifest { |  | ||||||
| 		if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil { |  | ||||||
| 			return 0, err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// First, we skip the existing segments that are not modified by this call
 | 	// First, we skip the existing segments that are not modified by this call
 | ||||||
| 	for i := range segments { | 	for i := range segments { | ||||||
| 		if offset < cursor+segments[i].Bytes { | 		if offset < cursor+segments[i].Bytes { | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 		cursor += segments[i].Bytes | 		cursor += segments[i].Bytes | ||||||
|  | 		hash.Write([]byte(segments[i].Hash)) | ||||||
| 		partNumber++ | 		partNumber++ | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -312,7 +315,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea | ||||||
| 	if offset >= currentLength { | 	if offset >= currentLength { | ||||||
| 		for offset-currentLength >= chunkSize { | 		for offset-currentLength >= chunkSize { | ||||||
| 			// Insert a block a zero
 | 			// Insert a block a zero
 | ||||||
| 			_, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) | 			headers, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				if err == swift.ObjectNotFound { | 				if err == swift.ObjectNotFound { | ||||||
| 					return 0, storagedriver.PathNotFoundError{Path: getSegment()} | 					return 0, storagedriver.PathNotFoundError{Path: getSegment()} | ||||||
|  | @ -321,6 +324,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea | ||||||
| 			} | 			} | ||||||
| 			currentLength += chunkSize | 			currentLength += chunkSize | ||||||
| 			partNumber++ | 			partNumber++ | ||||||
|  | 			hash.Write([]byte(headers["Etag"])) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		cursor = currentLength | 		cursor = currentLength | ||||||
|  | @ -355,13 +359,23 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea | ||||||
| 			return false, bytesRead, err | 			return false, bytesRead, err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		n, err := io.Copy(currentSegment, multi) | 		segmentHash := md5.New() | ||||||
|  | 		writer := io.MultiWriter(currentSegment, segmentHash) | ||||||
|  | 
 | ||||||
|  | 		n, err := io.Copy(writer, multi) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return false, bytesRead, err | 			return false, bytesRead, err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if n > 0 { | 		if n > 0 { | ||||||
| 			defer currentSegment.Close() | 			defer func() { | ||||||
|  | 				closeError := currentSegment.Close() | ||||||
|  | 				if err != nil { | ||||||
|  | 					err = closeError | ||||||
|  | 				} | ||||||
|  | 				hexHash := hex.EncodeToString(segmentHash.Sum(nil)) | ||||||
|  | 				hash.Write([]byte(hexHash)) | ||||||
|  | 			}() | ||||||
| 			bytesRead += n - max(0, offset-cursor) | 			bytesRead += n - max(0, offset-cursor) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | @ -379,7 +393,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea | ||||||
| 					return false, bytesRead, err | 					return false, bytesRead, err | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				_, copyErr := io.Copy(currentSegment, file) | 				_, copyErr := io.Copy(writer, file) | ||||||
| 
 | 
 | ||||||
| 				if err := file.Close(); err != nil { | 				if err := file.Close(); err != nil { | ||||||
| 					if err == swift.ObjectNotFound { | 					if err == swift.ObjectNotFound { | ||||||
|  | @ -414,7 +428,35 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return bytesRead, nil | 	for ; partNumber < len(segments); partNumber++ { | ||||||
|  | 		hash.Write([]byte(segments[partNumber].Hash)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if createManifest { | ||||||
|  | 		if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil { | ||||||
|  | 			return 0, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	expectedHash := hex.EncodeToString(hash.Sum(nil)) | ||||||
|  | 	waitingTime := readAfterWriteWait | ||||||
|  | 	endTime := time.Now().Add(readAfterWriteTimeout) | ||||||
|  | 	for { | ||||||
|  | 		var infos swift.Object | ||||||
|  | 		if infos, _, err = d.Conn.Object(d.Container, d.swiftPath(path)); err == nil { | ||||||
|  | 			if strings.Trim(infos.Hash, "\"") == expectedHash { | ||||||
|  | 				return bytesRead, nil | ||||||
|  | 			} | ||||||
|  | 			err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", path) | ||||||
|  | 		} | ||||||
|  | 		if time.Now().Add(waitingTime).After(endTime) { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 		time.Sleep(waitingTime) | ||||||
|  | 		waitingTime *= 2 | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return bytesRead, err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Stat retrieves the FileInfo for the given path, including the current size
 | // Stat retrieves the FileInfo for the given path, including the current size
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue