wrap buffered writer around filewriter
benchmarks added to filewriter_test, demonstrate buffered version is ~5x faster on my hardware. Signed-off-by: David Lawrence <david.lawrence@docker.com> (github: endophage)master
							parent
							
								
									b1c8952c1a
								
							
						
					
					
						commit
						b870e3fdfb
					
				| 
						 | 
				
			
			@ -1,6 +1,7 @@
 | 
			
		|||
package storage
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
| 
						 | 
				
			
			@ -9,6 +10,10 @@ import (
 | 
			
		|||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	fileWriterBufferSize = 5 << 20
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// fileWriter implements a remote file writer backed by a storage driver.
 | 
			
		||||
type fileWriter struct {
 | 
			
		||||
	driver storagedriver.StorageDriver
 | 
			
		||||
| 
						 | 
				
			
			@ -22,6 +27,11 @@ type fileWriter struct {
 | 
			
		|||
	err    error // terminal error, if set, reader is closed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type bufferedFileWriter struct {
 | 
			
		||||
	fileWriter
 | 
			
		||||
	bw *bufio.Writer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// fileWriterInterface makes the desired io compliant interface that the
 | 
			
		||||
// filewriter should implement.
 | 
			
		||||
type fileWriterInterface interface {
 | 
			
		||||
| 
						 | 
				
			
			@ -35,7 +45,7 @@ var _ fileWriterInterface = &fileWriter{}
 | 
			
		|||
 | 
			
		||||
// newFileWriter returns a prepared fileWriter for the driver and path. This
 | 
			
		||||
// could be considered similar to an "open" call on a regular filesystem.
 | 
			
		||||
func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) {
 | 
			
		||||
func newFileWriter(driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) {
 | 
			
		||||
	fw := fileWriter{
 | 
			
		||||
		driver: driver,
 | 
			
		||||
		path:   path,
 | 
			
		||||
| 
						 | 
				
			
			@ -56,7 +66,42 @@ func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter
 | 
			
		|||
		fw.size = fi.Size()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &fw, nil
 | 
			
		||||
	buffered := bufferedFileWriter{
 | 
			
		||||
		fileWriter: fw,
 | 
			
		||||
	}
 | 
			
		||||
	buffered.bw = bufio.NewWriterSize(&buffered.fileWriter, fileWriterBufferSize)
 | 
			
		||||
 | 
			
		||||
	return &buffered, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// wraps the fileWriter.Write method to buffer small writes
 | 
			
		||||
func (bfw *bufferedFileWriter) Write(p []byte) (int, error) {
 | 
			
		||||
	return bfw.bw.Write(p)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// wraps fileWriter.Close to ensure the buffer is flushed
 | 
			
		||||
// before we close the writer.
 | 
			
		||||
func (bfw *bufferedFileWriter) Close() (err error) {
 | 
			
		||||
	if err = bfw.Flush(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	err = bfw.fileWriter.Close()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// wraps fileWriter.Seek to ensure offset is handled
 | 
			
		||||
// correctly in respect to pending data in the buffer
 | 
			
		||||
func (bfw *bufferedFileWriter) Seek(offset int64, whence int) (int64, error) {
 | 
			
		||||
	if err := bfw.Flush(); err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
	return bfw.fileWriter.Seek(offset, whence)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// wraps bufio.Writer.Flush to allow intermediate flushes
 | 
			
		||||
// of the bufferedFileWriter
 | 
			
		||||
func (bfw *bufferedFileWriter) Flush() error {
 | 
			
		||||
	return bfw.bw.Flush()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write writes the buffer p at the current write offset.
 | 
			
		||||
| 
						 | 
				
			
			@ -108,6 +153,9 @@ func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
// Close closes the fileWriter for writing.
 | 
			
		||||
// Calling it once is valid and correct and it will
 | 
			
		||||
// return a nil error. Calling it subsequent times will
 | 
			
		||||
// detect that fw.err has been set and will return the error.
 | 
			
		||||
func (fw *fileWriter) Close() error {
 | 
			
		||||
	if fw.err != nil {
 | 
			
		||||
		return fw.err
 | 
			
		||||
| 
						 | 
				
			
			@ -115,7 +163,7 @@ func (fw *fileWriter) Close() error {
 | 
			
		|||
 | 
			
		||||
	fw.err = fmt.Errorf("filewriter@%v: closed", fw.path)
 | 
			
		||||
 | 
			
		||||
	return fw.err
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// readFromAt writes to fw from r at the specified offset. If offset is less
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,6 +8,7 @@ import (
 | 
			
		|||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -42,6 +43,7 @@ func TestSimpleWrite(t *testing.T) {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error writing content: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	fw.Flush()
 | 
			
		||||
 | 
			
		||||
	if n != len(content) {
 | 
			
		||||
		t.Fatalf("unexpected write length: %d != %d", n, len(content))
 | 
			
		||||
| 
						 | 
				
			
			@ -146,3 +148,99 @@ func TestSimpleWrite(t *testing.T) {
 | 
			
		|||
		t.Fatalf("unable to verify write data")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBufferedFileWriter(t *testing.T) {
 | 
			
		||||
	writer, err := newFileWriter(inmemory.New(), "/random")
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// write one byte and ensure the offset hasn't been incremented.
 | 
			
		||||
	// offset will only get incremented when the buffer gets flushed
 | 
			
		||||
	short := []byte{byte(1)}
 | 
			
		||||
 | 
			
		||||
	writer.Write(short)
 | 
			
		||||
 | 
			
		||||
	if writer.offset > 0 {
 | 
			
		||||
		t.Fatalf("WriteStream called prematurely")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// write enough data to cause the buffer to flush and confirm
 | 
			
		||||
	// the offset has been incremented
 | 
			
		||||
	long := make([]byte, fileWriterBufferSize)
 | 
			
		||||
	_, err = rand.Read(long)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error building random data: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	for i := range long {
 | 
			
		||||
		long[i] = byte(i)
 | 
			
		||||
	}
 | 
			
		||||
	writer.Write(long)
 | 
			
		||||
	writer.Close()
 | 
			
		||||
	if writer.offset != (fileWriterBufferSize + 1) {
 | 
			
		||||
		t.Fatalf("WriteStream not called when buffer capacity reached")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkFileWriter(b *testing.B) {
 | 
			
		||||
	b.StopTimer() // not sure how long setup above will take
 | 
			
		||||
	for i := 0; i < b.N; i++ {
 | 
			
		||||
		// Start basic fileWriter initialization
 | 
			
		||||
		fw := fileWriter{
 | 
			
		||||
			driver: inmemory.New(),
 | 
			
		||||
			path:   "/random",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if fi, err := fw.driver.Stat(fw.path); err != nil {
 | 
			
		||||
			switch err := err.(type) {
 | 
			
		||||
			case storagedriver.PathNotFoundError:
 | 
			
		||||
				// ignore, offset is zero
 | 
			
		||||
			default:
 | 
			
		||||
				b.Fatalf("Failed to initialize fileWriter: %v", err.Error())
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			if fi.IsDir() {
 | 
			
		||||
				b.Fatalf("Cannot write to a directory")
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			fw.size = fi.Size()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		randomBytes := make([]byte, 1<<20)
 | 
			
		||||
		_, err := rand.Read(randomBytes)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			b.Fatalf("unexpected error building random data: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		// End basic file writer initialization
 | 
			
		||||
 | 
			
		||||
		b.StartTimer()
 | 
			
		||||
		for j := 0; j < 100; j++ {
 | 
			
		||||
			fw.Write(randomBytes)
 | 
			
		||||
		}
 | 
			
		||||
		b.StopTimer()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func BenchmarkBufferedFileWriter(b *testing.B) {
 | 
			
		||||
	b.StopTimer() // not sure how long setup above will take
 | 
			
		||||
	for i := 0; i < b.N; i++ {
 | 
			
		||||
		bfw, err := newFileWriter(inmemory.New(), "/random")
 | 
			
		||||
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			b.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		randomBytes := make([]byte, 1<<20)
 | 
			
		||||
		_, err = rand.Read(randomBytes)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			b.Fatalf("unexpected error building random data: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		b.StartTimer()
 | 
			
		||||
		for j := 0; j < 100; j++ {
 | 
			
		||||
			bfw.Write(randomBytes)
 | 
			
		||||
		}
 | 
			
		||||
		b.StopTimer()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -139,10 +139,10 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	return &layerUploadController{
 | 
			
		||||
		layerStore: ls,
 | 
			
		||||
		uuid:       uuid,
 | 
			
		||||
		startedAt:  startedAt,
 | 
			
		||||
		fileWriter: *fw,
 | 
			
		||||
		layerStore:         ls,
 | 
			
		||||
		uuid:               uuid,
 | 
			
		||||
		startedAt:          startedAt,
 | 
			
		||||
		bufferedFileWriter: *fw,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,7 +22,9 @@ type layerUploadController struct {
 | 
			
		|||
	uuid      string
 | 
			
		||||
	startedAt time.Time
 | 
			
		||||
 | 
			
		||||
	fileWriter
 | 
			
		||||
	// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisy
 | 
			
		||||
	// LayerUpload Interface
 | 
			
		||||
	bufferedFileWriter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ distribution.LayerUpload = &layerUploadController{}
 | 
			
		||||
| 
						 | 
				
			
			@ -42,6 +44,12 @@ func (luc *layerUploadController) StartedAt() time.Time {
 | 
			
		|||
// format <algorithm>:<hex digest>.
 | 
			
		||||
func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) {
 | 
			
		||||
	ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish")
 | 
			
		||||
 | 
			
		||||
	err := luc.bufferedFileWriter.Close()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	canonical, err := luc.validateLayer(digest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
| 
						 | 
				
			
			@ -103,7 +111,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
 | 
			
		|||
	// then only have to fetch the difference.
 | 
			
		||||
 | 
			
		||||
	// Read the file from the backend driver and validate it.
 | 
			
		||||
	fr, err := newFileReader(luc.fileWriter.driver, luc.path)
 | 
			
		||||
	fr, err := newFileReader(luc.bufferedFileWriter.driver, luc.path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue