Pool buffers used in S3.WriteStream
Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									06fcc9213a
								
							
						
					
					
						commit
						c49f7cd015
					
				| 
						 | 
				
			
			@ -22,6 +22,7 @@ import (
 | 
			
		|||
	"net/http"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/AdRoll/goamz/aws"
 | 
			
		||||
| 
						 | 
				
			
			@ -72,6 +73,9 @@ type driver struct {
 | 
			
		|||
	ChunkSize     int64
 | 
			
		||||
	Encrypt       bool
 | 
			
		||||
	RootDirectory string
 | 
			
		||||
 | 
			
		||||
	pool  sync.Pool // pool []byte buffers used for WriteStream
 | 
			
		||||
	zeros []byte    // shared, zero-valued buffer used for WriteStream
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type baseEmbed struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -224,6 +228,11 @@ func New(params DriverParameters) (*Driver, error) {
 | 
			
		|||
		ChunkSize:     params.ChunkSize,
 | 
			
		||||
		Encrypt:       params.Encrypt,
 | 
			
		||||
		RootDirectory: params.RootDirectory,
 | 
			
		||||
		zeros:         make([]byte, params.ChunkSize),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	d.pool.New = func() interface{} {
 | 
			
		||||
		return make([]byte, d.ChunkSize)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &Driver{
 | 
			
		||||
| 
						 | 
				
			
			@ -287,8 +296,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	buf := make([]byte, d.ChunkSize)
 | 
			
		||||
	zeroBuf := make([]byte, d.ChunkSize)
 | 
			
		||||
	buf := d.getbuf()
 | 
			
		||||
 | 
			
		||||
	// We never want to leave a dangling multipart upload, our only consistent state is
 | 
			
		||||
	// when there is a whole object at path. This is in order to remain consistent with
 | 
			
		||||
| 
						 | 
				
			
			@ -314,6 +322,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		d.putbuf(buf) // needs to be here to pick up new buf value
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Fills from 0 to total from current
 | 
			
		||||
| 
						 | 
				
			
			@ -367,6 +377,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		go func(bytesRead int, from int64, buf []byte) {
 | 
			
		||||
			defer d.putbuf(buf) // this buffer gets dropped after this call
 | 
			
		||||
 | 
			
		||||
			// parts and partNumber are safe, because this function is the only one modifying them and we
 | 
			
		||||
			// force it to be executed serially.
 | 
			
		||||
			if bytesRead > 0 {
 | 
			
		||||
| 
						 | 
				
			
			@ -381,7 +393,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
			putErrChan <- nil
 | 
			
		||||
		}(bytesRead, from, buf)
 | 
			
		||||
 | 
			
		||||
		buf = make([]byte, d.ChunkSize)
 | 
			
		||||
		buf = d.getbuf() // use a new buffer for the next call
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -429,7 +441,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
			fromZeroFillSmall := func(from, to int64) error {
 | 
			
		||||
				bytesRead = 0
 | 
			
		||||
				for from+int64(bytesRead) < to {
 | 
			
		||||
					nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to])
 | 
			
		||||
					nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
 | 
			
		||||
					bytesRead += nn
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
| 
						 | 
				
			
			@ -443,7 +455,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
			fromZeroFillLarge := func(from, to int64) error {
 | 
			
		||||
				bytesRead64 := int64(0)
 | 
			
		||||
				for to-(from+bytesRead64) >= d.ChunkSize {
 | 
			
		||||
					part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
 | 
			
		||||
					part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
| 
						 | 
				
			
			@ -724,3 +736,13 @@ func getPermissions() s3.ACL {
 | 
			
		|||
func (d *driver) getContentType() string {
 | 
			
		||||
	return "application/octet-stream"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
 | 
			
		||||
func (d *driver) getbuf() []byte {
 | 
			
		||||
	return d.pool.Get().([]byte)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) putbuf(p []byte) {
 | 
			
		||||
	copy(p, d.zeros)
 | 
			
		||||
	d.pool.Put(p)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue