Implements zero fill behaviour for large offset in WriteStream
This requires a very intricate WriteStream test, which will be in the next commit.master
							parent
							
								
									8ca960a0b5
								
							
						
					
					
						commit
						11ed0515d0
					
				| 
						 | 
				
			
			@ -192,6 +192,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	partNumber := 1
 | 
			
		||||
	bytesRead := 0
 | 
			
		||||
	parts := []s3.Part{}
 | 
			
		||||
	var part s3.Part
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -201,6 +202,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	buf := make([]byte, chunkSize)
 | 
			
		||||
	zeroBuf := make([]byte, chunkSize)
 | 
			
		||||
 | 
			
		||||
	// We never want to leave a dangling multipart upload, our only consistent state is
 | 
			
		||||
	// when there is a whole object at path. This is in order to remain consistent with
 | 
			
		||||
| 
						 | 
				
			
			@ -211,64 +213,240 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
	// made prior to the machine crashing.
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if len(parts) > 0 {
 | 
			
		||||
			err = multi.Complete(parts)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				multi.Abort()
 | 
			
		||||
			if multi == nil {
 | 
			
		||||
				// Parts should be empty if the multi is not initialized
 | 
			
		||||
				panic("Unreachable")
 | 
			
		||||
			} else {
 | 
			
		||||
				if multi.Complete(parts) != nil {
 | 
			
		||||
					multi.Abort()
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Fills from 0 to total from current
 | 
			
		||||
	fromSmallCurrent := func(total int64) error {
 | 
			
		||||
		current, err := d.ReadStream(path, 0)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		bytesRead = 0
 | 
			
		||||
		for int64(bytesRead) < total {
 | 
			
		||||
			//The loop should very rarely enter a second iteration
 | 
			
		||||
			nn, err := io.ReadFull(current, buf[bytesRead:total])
 | 
			
		||||
			if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			bytesRead += nn
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Fills from parameter to chunkSize from reader
 | 
			
		||||
	fromReader := func(from int64) error {
 | 
			
		||||
		bytesRead = 0
 | 
			
		||||
		for int64(bytesRead) < chunkSize {
 | 
			
		||||
			nn, err := io.ReadFull(reader, buf[from+int64(bytesRead):])
 | 
			
		||||
			totalRead += int64(nn)
 | 
			
		||||
			bytesRead += nn
 | 
			
		||||
 | 
			
		||||
			if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if offset > 0 {
 | 
			
		||||
		resp, err := d.Bucket.Head(d.s3Path(path), nil)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return 0, err
 | 
			
		||||
		}
 | 
			
		||||
		if resp.ContentLength < offset {
 | 
			
		||||
			return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if resp.ContentLength < chunkSize {
 | 
			
		||||
			// If everything written so far is less than the minimum part size of 5MB, we need
 | 
			
		||||
			// to fill out the first part up to that minimum.
 | 
			
		||||
			current, err := d.ReadStream(path, 0)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
			if s3Err, ok := err.(*s3.Error); !ok || s3Err.Code != "NoSuchKey" {
 | 
			
		||||
				return 0, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
			bytesRead, err := io.ReadFull(current, buf[0:offset])
 | 
			
		||||
			if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
 | 
			
		||||
				return 0, err
 | 
			
		||||
			} else if int64(bytesRead) != offset {
 | 
			
		||||
				//TODO Maybe a different error? I don't even think this case is reachable...
 | 
			
		||||
				return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
 | 
			
		||||
		currentLength := int64(0)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			currentLength = resp.ContentLength
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if currentLength >= offset {
 | 
			
		||||
			if offset < chunkSize {
 | 
			
		||||
				// chunkSize > currentLength >= offset
 | 
			
		||||
				if err = fromSmallCurrent(offset); err != nil {
 | 
			
		||||
					return totalRead, err
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if err = fromReader(offset); err != nil {
 | 
			
		||||
					return totalRead, err
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				// currentLength >= offset >= chunkSize
 | 
			
		||||
				_, part, err = multi.PutPartCopy(partNumber,
 | 
			
		||||
					s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)},
 | 
			
		||||
					d.Bucket.Name+"/"+d.s3Path(path))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return 0, err
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			bytesRead, err = io.ReadFull(reader, buf[offset:])
 | 
			
		||||
			totalRead += int64(bytesRead)
 | 
			
		||||
			parts = append(parts, part)
 | 
			
		||||
			partNumber++
 | 
			
		||||
 | 
			
		||||
			if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
 | 
			
		||||
				return totalRead, err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+offset]))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return totalRead, err
 | 
			
		||||
			if totalRead+offset < chunkSize {
 | 
			
		||||
				return totalRead, nil
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			fmt.Println("About to PutPartCopy")
 | 
			
		||||
			// If the file that we already have is larger than 5MB, then we make it the first part
 | 
			
		||||
			// of the new multipart upload.
 | 
			
		||||
			_, part, err = multi.PutPartCopy(partNumber, s3.CopyOptions{}, d.Bucket.Name+"/"+d.s3Path(path))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return 0, err
 | 
			
		||||
			// Fills between parameters with 0s but only when to - from <= chunkSize
 | 
			
		||||
			fromZeroFillSmall := func(from, to int64) error {
 | 
			
		||||
				bytesRead = 0
 | 
			
		||||
				for from+int64(bytesRead) < to {
 | 
			
		||||
					nn, err := io.ReadFull(bytes.NewReader(zeroBuf), buf[from+int64(bytesRead):to])
 | 
			
		||||
					bytesRead += nn
 | 
			
		||||
 | 
			
		||||
					if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		parts = append(parts, part)
 | 
			
		||||
		partNumber++
 | 
			
		||||
			// Fills between parameters with 0s, making new parts
 | 
			
		||||
			fromZeroFillLarge := func(from, to int64) error {
 | 
			
		||||
				bytesRead64 := int64(0)
 | 
			
		||||
				for to-(from+bytesRead64) >= chunkSize {
 | 
			
		||||
					part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
					bytesRead64 += chunkSize
 | 
			
		||||
 | 
			
		||||
					parts = append(parts, part)
 | 
			
		||||
					partNumber++
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				bytesRead = 0
 | 
			
		||||
				for from+bytesRead64+int64(bytesRead) < to {
 | 
			
		||||
					nn, err := io.ReadFull(bytes.NewReader(zeroBuf), buf[0+bytesRead:(to-from)%chunkSize])
 | 
			
		||||
					bytesRead64 += int64(nn)
 | 
			
		||||
 | 
			
		||||
					if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// currentLength < offset
 | 
			
		||||
			if currentLength < chunkSize {
 | 
			
		||||
				if offset < chunkSize {
 | 
			
		||||
					// chunkSize > offset > currentLength
 | 
			
		||||
					if err = fromSmallCurrent(currentLength); err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					if err = fromZeroFillSmall(currentLength, offset); err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					if err = fromReader(offset); err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					parts = append(parts, part)
 | 
			
		||||
					partNumber++
 | 
			
		||||
 | 
			
		||||
					if totalRead+offset < chunkSize {
 | 
			
		||||
						return totalRead, nil
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
					// offset >= chunkSize > currentLength
 | 
			
		||||
					if err = fromSmallCurrent(currentLength); err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					if err = fromZeroFillSmall(currentLength, chunkSize); err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					parts = append(parts, part)
 | 
			
		||||
					partNumber++
 | 
			
		||||
 | 
			
		||||
					//Zero fill from chunkSize up to offset, then some reader
 | 
			
		||||
					if err = fromZeroFillLarge(chunkSize, offset); err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					if err = fromReader(offset % chunkSize); err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return totalRead, err
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					parts = append(parts, part)
 | 
			
		||||
					partNumber++
 | 
			
		||||
 | 
			
		||||
					if totalRead+(offset%chunkSize) < chunkSize {
 | 
			
		||||
						return totalRead, nil
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				// offset > currentLength >= chunkSize
 | 
			
		||||
				_, part, err = multi.PutPartCopy(partNumber,
 | 
			
		||||
					s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(currentLength-1, 10)},
 | 
			
		||||
					d.Bucket.Name+"/"+d.s3Path(path))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return 0, err
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				parts = append(parts, part)
 | 
			
		||||
				partNumber++
 | 
			
		||||
 | 
			
		||||
				//Zero fill from currentLength up to offset, then some reader
 | 
			
		||||
				if err = fromZeroFillLarge(currentLength, offset); err != nil {
 | 
			
		||||
					return totalRead, err
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if err = fromReader((offset - currentLength) % chunkSize); err != nil {
 | 
			
		||||
					return totalRead, err
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return totalRead, err
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				parts = append(parts, part)
 | 
			
		||||
				partNumber++
 | 
			
		||||
 | 
			
		||||
				if totalRead+((offset-currentLength)%chunkSize) < chunkSize {
 | 
			
		||||
					return totalRead, nil
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		if totalRead+offset < chunkSize {
 | 
			
		||||
			return totalRead, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -916,9 +916,13 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
 | 
			
		|||
	tf.Sync()
 | 
			
		||||
	tf.Seek(0, os.SEEK_SET)
 | 
			
		||||
 | 
			
		||||
	nn, err := suite.StorageDriver.WriteStream(filename, 0, tf)
 | 
			
		||||
	c.Assert(err, check.IsNil)
 | 
			
		||||
	c.Assert(nn, check.Equals, size)
 | 
			
		||||
	totalRead := int64(0)
 | 
			
		||||
	for totalRead < size {
 | 
			
		||||
		nn, err := suite.StorageDriver.WriteStream(filename, 0, tf)
 | 
			
		||||
		c.Assert(err, check.IsNil)
 | 
			
		||||
		totalRead += nn
 | 
			
		||||
	}
 | 
			
		||||
	c.Assert(totalRead, check.Equals, size)
 | 
			
		||||
 | 
			
		||||
	reader, err := suite.StorageDriver.ReadStream(filename, 0)
 | 
			
		||||
	c.Assert(err, check.IsNil)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue