Pool buffers used in S3.WriteStream
Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									aa09c6c262
								
							
						
					
					
						commit
						1adab029f6
					
				|  | @ -22,6 +22,7 @@ import ( | |||
| 	"net/http" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/AdRoll/goamz/aws" | ||||
|  | @ -72,6 +73,9 @@ type driver struct { | |||
| 	ChunkSize     int64 | ||||
| 	Encrypt       bool | ||||
| 	RootDirectory string | ||||
| 
 | ||||
| 	pool  sync.Pool // pool []byte buffers used for WriteStream
 | ||||
| 	zeros []byte    // shared, zero-valued buffer used for WriteStream
 | ||||
| } | ||||
| 
 | ||||
| type baseEmbed struct { | ||||
|  | @ -224,6 +228,11 @@ func New(params DriverParameters) (*Driver, error) { | |||
| 		ChunkSize:     params.ChunkSize, | ||||
| 		Encrypt:       params.Encrypt, | ||||
| 		RootDirectory: params.RootDirectory, | ||||
| 		zeros:         make([]byte, params.ChunkSize), | ||||
| 	} | ||||
| 
 | ||||
| 	d.pool.New = func() interface{} { | ||||
| 		return make([]byte, d.ChunkSize) | ||||
| 	} | ||||
| 
 | ||||
| 	return &Driver{ | ||||
|  | @ -287,8 +296,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total | |||
| 		return 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	buf := make([]byte, d.ChunkSize) | ||||
| 	zeroBuf := make([]byte, d.ChunkSize) | ||||
| 	buf := d.getbuf() | ||||
| 
 | ||||
| 	// We never want to leave a dangling multipart upload, our only consistent state is
 | ||||
| 	// when there is a whole object at path. This is in order to remain consistent with
 | ||||
|  | @ -314,6 +322,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total | |||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		d.putbuf(buf) // needs to be here to pick up new buf value
 | ||||
| 	}() | ||||
| 
 | ||||
| 	// Fills from 0 to total from current
 | ||||
|  | @ -367,6 +377,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total | |||
| 		} | ||||
| 
 | ||||
| 		go func(bytesRead int, from int64, buf []byte) { | ||||
| 			defer d.putbuf(buf) // this buffer gets dropped after this call
 | ||||
| 
 | ||||
| 			// parts and partNumber are safe, because this function is the only one modifying them and we
 | ||||
| 			// force it to be executed serially.
 | ||||
| 			if bytesRead > 0 { | ||||
|  | @ -381,7 +393,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total | |||
| 			putErrChan <- nil | ||||
| 		}(bytesRead, from, buf) | ||||
| 
 | ||||
| 		buf = make([]byte, d.ChunkSize) | ||||
| 		buf = d.getbuf() // use a new buffer for the next call
 | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
|  | @ -429,7 +441,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total | |||
| 			fromZeroFillSmall := func(from, to int64) error { | ||||
| 				bytesRead = 0 | ||||
| 				for from+int64(bytesRead) < to { | ||||
| 					nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to]) | ||||
| 					nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to]) | ||||
| 					bytesRead += nn | ||||
| 					if err != nil { | ||||
| 						return err | ||||
|  | @ -443,7 +455,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total | |||
| 			fromZeroFillLarge := func(from, to int64) error { | ||||
| 				bytesRead64 := int64(0) | ||||
| 				for to-(from+bytesRead64) >= d.ChunkSize { | ||||
| 					part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf)) | ||||
| 					part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros)) | ||||
| 					if err != nil { | ||||
| 						return err | ||||
| 					} | ||||
|  | @ -724,3 +736,13 @@ func getPermissions() s3.ACL { | |||
| func (d *driver) getContentType() string { | ||||
| 	return "application/octet-stream" | ||||
| } | ||||
| 
 | ||||
| // getbuf returns a buffer from the driver's pool with length d.ChunkSize.
 | ||||
| func (d *driver) getbuf() []byte { | ||||
| 	return d.pool.Get().([]byte) | ||||
| } | ||||
| 
 | ||||
| func (d *driver) putbuf(p []byte) { | ||||
| 	copy(p, d.zeros) | ||||
| 	d.pool.Put(p) | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue