Implement a remote file writer for use with StorageDriver
This changeset implements a fileWriter type that can be used to managed writes to remote files in a StorageDriver. Basically, it manages a local seek position for a remote path. An efficient use of this implementation will write data in large blocks. Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									e8714b9977
								
							
						
					
					
						commit
						09522d8535
					
				| 
						 | 
				
			
			@ -0,0 +1,153 @@
 | 
			
		|||
package storage
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"os"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/storagedriver"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// fileWriter implements a remote file writer backed by a storage driver.
 | 
			
		||||
type fileWriter struct {
 | 
			
		||||
	driver storagedriver.StorageDriver
 | 
			
		||||
 | 
			
		||||
	// identifying fields
 | 
			
		||||
	path string
 | 
			
		||||
 | 
			
		||||
	// mutable fields
 | 
			
		||||
	size   int64 // size of the file, aka the current end
 | 
			
		||||
	offset int64 // offset is the current write offset
 | 
			
		||||
	err    error // terminal error, if set, reader is closed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// fileWriterInterface makes the desired io compliant interface that the
 | 
			
		||||
// filewriter should implement.
 | 
			
		||||
type fileWriterInterface interface {
 | 
			
		||||
	io.WriteSeeker
 | 
			
		||||
	io.WriterAt
 | 
			
		||||
	io.ReaderFrom
 | 
			
		||||
	io.Closer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ fileWriterInterface = &fileWriter{}
 | 
			
		||||
 | 
			
		||||
// newFileWriter returns a prepared fileWriter for the driver and path. This
 | 
			
		||||
// could be considered similar to an "open" call on a regular filesystem.
 | 
			
		||||
func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) {
 | 
			
		||||
	fw := fileWriter{
 | 
			
		||||
		driver: driver,
 | 
			
		||||
		path:   path,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if fi, err := driver.Stat(path); err != nil {
 | 
			
		||||
		switch err := err.(type) {
 | 
			
		||||
		case storagedriver.PathNotFoundError:
 | 
			
		||||
			// ignore, offset is zero
 | 
			
		||||
		default:
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		if fi.IsDir() {
 | 
			
		||||
			return nil, fmt.Errorf("cannot write to a directory")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		fw.size = fi.Size()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &fw, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write writes the buffer p at the current write offset.
 | 
			
		||||
func (fw *fileWriter) Write(p []byte) (n int, err error) {
 | 
			
		||||
	nn, err := fw.readFromAt(bytes.NewReader(p), -1)
 | 
			
		||||
	return int(nn), err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteAt writes p at the specified offset. The underlying offset does not
 | 
			
		||||
// change.
 | 
			
		||||
func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) {
 | 
			
		||||
	nn, err := fw.readFromAt(bytes.NewReader(p), offset)
 | 
			
		||||
	return int(nn), err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadFrom reads reader r until io.EOF writing the contents at the current
 | 
			
		||||
// offset.
 | 
			
		||||
func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) {
 | 
			
		||||
	return fw.readFromAt(r, -1)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Seek moves the write position do the requested offest based on the whence
 | 
			
		||||
// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET.
 | 
			
		||||
func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
 | 
			
		||||
	if fw.err != nil {
 | 
			
		||||
		return 0, fw.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	newOffset := fw.offset
 | 
			
		||||
 | 
			
		||||
	switch whence {
 | 
			
		||||
	case os.SEEK_CUR:
 | 
			
		||||
		newOffset += int64(offset)
 | 
			
		||||
	case os.SEEK_END:
 | 
			
		||||
		newOffset = fw.size + int64(offset)
 | 
			
		||||
	case os.SEEK_SET:
 | 
			
		||||
		newOffset = int64(offset)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if newOffset < 0 {
 | 
			
		||||
		err = fmt.Errorf("cannot seek to negative position")
 | 
			
		||||
	} else if newOffset > fw.size {
 | 
			
		||||
		fw.offset = newOffset
 | 
			
		||||
		fw.size = newOffset
 | 
			
		||||
	} else {
 | 
			
		||||
		// No problems, set the offset.
 | 
			
		||||
		fw.offset = newOffset
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return fw.offset, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close closes the fileWriter for writing.
 | 
			
		||||
func (fw *fileWriter) Close() error {
 | 
			
		||||
	if fw.err != nil {
 | 
			
		||||
		return fw.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fw.err = fmt.Errorf("filewriter@%v: closed", fw.path)
 | 
			
		||||
 | 
			
		||||
	return fw.err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// readFromAt writes to fw from r at the specified offset. If offset is less
 | 
			
		||||
// than zero, the value of fw.offset is used and updated after the operation.
 | 
			
		||||
func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) {
 | 
			
		||||
	if fw.err != nil {
 | 
			
		||||
		return 0, fw.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var updateOffset bool
 | 
			
		||||
	if offset < 0 {
 | 
			
		||||
		offset = fw.offset
 | 
			
		||||
		updateOffset = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	nn, err := fw.driver.WriteStream(fw.path, offset, r)
 | 
			
		||||
 | 
			
		||||
	if updateOffset {
 | 
			
		||||
		// We should forward the offset, whether or not there was an error.
 | 
			
		||||
		// Basically, we keep the filewriter in sync with the reader's head. If an
 | 
			
		||||
		// error is encountered, the whole thing should be retried but we proceed
 | 
			
		||||
		// from an expected offset, even if the data didn't make it to the
 | 
			
		||||
		// backend.
 | 
			
		||||
		fw.offset += nn
 | 
			
		||||
 | 
			
		||||
		if fw.offset > fw.size {
 | 
			
		||||
			fw.size = fw.offset
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nn, err
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,148 @@
 | 
			
		|||
package storage
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"crypto/rand"
 | 
			
		||||
	"io"
 | 
			
		||||
	"os"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
	"github.com/docker/distribution/storagedriver/inmemory"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TestSimpleWrite takes the fileWriter through common write operations
 | 
			
		||||
// ensuring data integrity.
 | 
			
		||||
func TestSimpleWrite(t *testing.T) {
 | 
			
		||||
	content := make([]byte, 1<<20)
 | 
			
		||||
	n, err := rand.Read(content)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error building random data: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if n != len(content) {
 | 
			
		||||
		t.Fatalf("random read did't fill buffer")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dgst, err := digest.FromReader(bytes.NewReader(content))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error digesting random content: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	driver := inmemory.New()
 | 
			
		||||
	path := "/random"
 | 
			
		||||
 | 
			
		||||
	fw, err := newFileWriter(driver, path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error creating fileWriter: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer fw.Close()
 | 
			
		||||
 | 
			
		||||
	n, err = fw.Write(content)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error writing content: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if n != len(content) {
 | 
			
		||||
		t.Fatalf("unexpected write length: %d != %d", n, len(content))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fr, err := newFileReader(driver, path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error creating fileReader: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer fr.Close()
 | 
			
		||||
 | 
			
		||||
	verifier := digest.NewDigestVerifier(dgst)
 | 
			
		||||
	io.Copy(verifier, fr)
 | 
			
		||||
 | 
			
		||||
	if !verifier.Verified() {
 | 
			
		||||
		t.Fatalf("unable to verify write data")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check the seek position is equal to the content length
 | 
			
		||||
	end, err := fw.Seek(0, os.SEEK_END)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error seeking: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if end != int64(len(content)) {
 | 
			
		||||
		t.Fatalf("write did not advance offset: %d != %d", end, len(content))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Double the content, but use the WriteAt method
 | 
			
		||||
	doubled := append(content, content...)
 | 
			
		||||
	doubledgst, err := digest.FromReader(bytes.NewReader(doubled))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error digesting doubled content: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n, err = fw.WriteAt(content, end)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error writing content at %d: %v", end, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if n != len(content) {
 | 
			
		||||
		t.Fatalf("writeat was short: %d != %d", n, len(content))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fr, err = newFileReader(driver, path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error creating fileReader: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer fr.Close()
 | 
			
		||||
 | 
			
		||||
	verifier = digest.NewDigestVerifier(doubledgst)
 | 
			
		||||
	io.Copy(verifier, fr)
 | 
			
		||||
 | 
			
		||||
	if !verifier.Verified() {
 | 
			
		||||
		t.Fatalf("unable to verify write data")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check that WriteAt didn't update the offset.
 | 
			
		||||
	end, err = fw.Seek(0, os.SEEK_END)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error seeking: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if end != int64(len(content)) {
 | 
			
		||||
		t.Fatalf("write did not advance offset: %d != %d", end, len(content))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Now, we copy from one path to another, running the data through the
 | 
			
		||||
	// fileReader to fileWriter, rather than the driver.Move command to ensure
 | 
			
		||||
	// everything is working correctly.
 | 
			
		||||
	fr, err = newFileReader(driver, path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error creating fileReader: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer fr.Close()
 | 
			
		||||
 | 
			
		||||
	fw, err = newFileWriter(driver, "/copied")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error creating fileWriter: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer fw.Close()
 | 
			
		||||
 | 
			
		||||
	nn, err := io.Copy(fw, fr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error copying data: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if nn != int64(len(doubled)) {
 | 
			
		||||
		t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fr, err = newFileReader(driver, "/copied")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error creating fileReader: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer fr.Close()
 | 
			
		||||
 | 
			
		||||
	verifier = digest.NewDigestVerifier(doubledgst)
 | 
			
		||||
	io.Copy(verifier, fr)
 | 
			
		||||
 | 
			
		||||
	if !verifier.Verified() {
 | 
			
		||||
		t.Fatalf("unable to verify write data")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue