Merge pull request #1141 from lebauce/swift-no-missing-segment
Ensure read after write for segmentsmaster
						commit
						aaf448b2de
					
				| 
						 | 
				
			
			@ -17,6 +17,7 @@ package swift
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"crypto/md5"
 | 
			
		||||
	"crypto/rand"
 | 
			
		||||
	"crypto/sha1"
 | 
			
		||||
	"crypto/tls"
 | 
			
		||||
| 
						 | 
				
			
			@ -48,6 +49,12 @@ const defaultChunkSize = 20 * 1024 * 1024
 | 
			
		|||
// minChunkSize defines the minimum size of a segment
 | 
			
		||||
const minChunkSize = 1 << 20
 | 
			
		||||
 | 
			
		||||
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
 | 
			
		||||
var readAfterWriteTimeout = 15 * time.Second
 | 
			
		||||
 | 
			
		||||
// readAfterWriteWait defines the time to sleep between two retries
 | 
			
		||||
var readAfterWriteWait = 200 * time.Millisecond
 | 
			
		||||
 | 
			
		||||
// Parameters A struct that encapsulates all of the driver parameters after all values have been set
 | 
			
		||||
type Parameters struct {
 | 
			
		||||
	Username            string
 | 
			
		||||
| 
						 | 
				
			
			@ -318,6 +325,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
	partNumber := 1
 | 
			
		||||
	chunkSize := int64(d.ChunkSize)
 | 
			
		||||
	zeroBuf := make([]byte, d.ChunkSize)
 | 
			
		||||
	hash := md5.New()
 | 
			
		||||
 | 
			
		||||
	getSegment := func() string {
 | 
			
		||||
		return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
 | 
			
		||||
| 
						 | 
				
			
			@ -358,18 +366,13 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if createManifest {
 | 
			
		||||
		if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil {
 | 
			
		||||
			return 0, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// First, we skip the existing segments that are not modified by this call
 | 
			
		||||
	for i := range segments {
 | 
			
		||||
		if offset < cursor+segments[i].Bytes {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		cursor += segments[i].Bytes
 | 
			
		||||
		hash.Write([]byte(segments[i].Hash))
 | 
			
		||||
		partNumber++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -378,7 +381,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
	if offset >= currentLength {
 | 
			
		||||
		for offset-currentLength >= chunkSize {
 | 
			
		||||
			// Insert a block a zero
 | 
			
		||||
			_, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil)
 | 
			
		||||
			headers, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				if err == swift.ObjectNotFound {
 | 
			
		||||
					return 0, storagedriver.PathNotFoundError{Path: getSegment()}
 | 
			
		||||
| 
						 | 
				
			
			@ -387,6 +390,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
			}
 | 
			
		||||
			currentLength += chunkSize
 | 
			
		||||
			partNumber++
 | 
			
		||||
			hash.Write([]byte(headers["Etag"]))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		cursor = currentLength
 | 
			
		||||
| 
						 | 
				
			
			@ -421,13 +425,23 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
			return false, bytesRead, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		n, err := io.Copy(currentSegment, multi)
 | 
			
		||||
		segmentHash := md5.New()
 | 
			
		||||
		writer := io.MultiWriter(currentSegment, segmentHash)
 | 
			
		||||
 | 
			
		||||
		n, err := io.Copy(writer, multi)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, bytesRead, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if n > 0 {
 | 
			
		||||
			defer currentSegment.Close()
 | 
			
		||||
			defer func() {
 | 
			
		||||
				closeError := currentSegment.Close()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					err = closeError
 | 
			
		||||
				}
 | 
			
		||||
				hexHash := hex.EncodeToString(segmentHash.Sum(nil))
 | 
			
		||||
				hash.Write([]byte(hexHash))
 | 
			
		||||
			}()
 | 
			
		||||
			bytesRead += n - max(0, offset-cursor)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -445,7 +459,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
					return false, bytesRead, err
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				_, copyErr := io.Copy(currentSegment, file)
 | 
			
		||||
				_, copyErr := io.Copy(writer, file)
 | 
			
		||||
 | 
			
		||||
				if err := file.Close(); err != nil {
 | 
			
		||||
					if err == swift.ObjectNotFound {
 | 
			
		||||
| 
						 | 
				
			
			@ -480,7 +494,35 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return bytesRead, nil
 | 
			
		||||
	for ; partNumber < len(segments); partNumber++ {
 | 
			
		||||
		hash.Write([]byte(segments[partNumber].Hash))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if createManifest {
 | 
			
		||||
		if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil {
 | 
			
		||||
			return 0, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expectedHash := hex.EncodeToString(hash.Sum(nil))
 | 
			
		||||
	waitingTime := readAfterWriteWait
 | 
			
		||||
	endTime := time.Now().Add(readAfterWriteTimeout)
 | 
			
		||||
	for {
 | 
			
		||||
		var infos swift.Object
 | 
			
		||||
		if infos, _, err = d.Conn.Object(d.Container, d.swiftPath(path)); err == nil {
 | 
			
		||||
			if strings.Trim(infos.Hash, "\"") == expectedHash {
 | 
			
		||||
				return bytesRead, nil
 | 
			
		||||
			}
 | 
			
		||||
			err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", path)
 | 
			
		||||
		}
 | 
			
		||||
		if time.Now().Add(waitingTime).After(endTime) {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(waitingTime)
 | 
			
		||||
		waitingTime *= 2
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return bytesRead, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Stat retrieves the FileInfo for the given path, including the current size
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue