commit
						0281f4dce5
					
				| 
						 | 
				
			
			@ -23,6 +23,7 @@ import (
 | 
			
		|||
	"reflect"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/AdRoll/goamz/aws"
 | 
			
		||||
| 
						 | 
				
			
			@ -73,6 +74,9 @@ type driver struct {
 | 
			
		|||
	ChunkSize     int64
 | 
			
		||||
	Encrypt       bool
 | 
			
		||||
	RootDirectory string
 | 
			
		||||
 | 
			
		||||
	pool  sync.Pool // pool []byte buffers used for WriteStream
 | 
			
		||||
	zeros []byte    // shared, zero-valued buffer used for WriteStream
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type baseEmbed struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -239,6 +243,11 @@ func New(params DriverParameters) (*Driver, error) {
 | 
			
		|||
		ChunkSize:     params.ChunkSize,
 | 
			
		||||
		Encrypt:       params.Encrypt,
 | 
			
		||||
		RootDirectory: params.RootDirectory,
 | 
			
		||||
		zeros:         make([]byte, params.ChunkSize),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	d.pool.New = func() interface{} {
 | 
			
		||||
		return make([]byte, d.ChunkSize)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &Driver{
 | 
			
		||||
| 
						 | 
				
			
			@ -302,8 +311,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	buf := make([]byte, d.ChunkSize)
 | 
			
		||||
	zeroBuf := make([]byte, d.ChunkSize)
 | 
			
		||||
	buf := d.getbuf()
 | 
			
		||||
 | 
			
		||||
	// We never want to leave a dangling multipart upload, our only consistent state is
 | 
			
		||||
	// when there is a whole object at path. This is in order to remain consistent with
 | 
			
		||||
| 
						 | 
				
			
			@ -329,6 +337,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		d.putbuf(buf) // needs to be here to pick up new buf value
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Fills from 0 to total from current
 | 
			
		||||
| 
						 | 
				
			
			@ -382,6 +392,8 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		go func(bytesRead int, from int64, buf []byte) {
 | 
			
		||||
			defer d.putbuf(buf) // this buffer gets dropped after this call
 | 
			
		||||
 | 
			
		||||
			// parts and partNumber are safe, because this function is the only one modifying them and we
 | 
			
		||||
			// force it to be executed serially.
 | 
			
		||||
			if bytesRead > 0 {
 | 
			
		||||
| 
						 | 
				
			
			@ -396,7 +408,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
			putErrChan <- nil
 | 
			
		||||
		}(bytesRead, from, buf)
 | 
			
		||||
 | 
			
		||||
		buf = make([]byte, d.ChunkSize)
 | 
			
		||||
		buf = d.getbuf() // use a new buffer for the next call
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -444,7 +456,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
			fromZeroFillSmall := func(from, to int64) error {
 | 
			
		||||
				bytesRead = 0
 | 
			
		||||
				for from+int64(bytesRead) < to {
 | 
			
		||||
					nn, err := bytes.NewReader(zeroBuf).Read(buf[from+int64(bytesRead) : to])
 | 
			
		||||
					nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
 | 
			
		||||
					bytesRead += nn
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
| 
						 | 
				
			
			@ -458,7 +470,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
			fromZeroFillLarge := func(from, to int64) error {
 | 
			
		||||
				bytesRead64 := int64(0)
 | 
			
		||||
				for to-(from+bytesRead64) >= d.ChunkSize {
 | 
			
		||||
					part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf))
 | 
			
		||||
					part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
| 
						 | 
				
			
			@ -739,3 +751,13 @@ func getPermissions() s3.ACL {
 | 
			
		|||
func (d *driver) getContentType() string {
 | 
			
		||||
	return "application/octet-stream"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
 | 
			
		||||
func (d *driver) getbuf() []byte {
 | 
			
		||||
	return d.pool.Get().([]byte)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *driver) putbuf(p []byte) {
 | 
			
		||||
	copy(p, d.zeros)
 | 
			
		||||
	d.pool.Put(p)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue