Update to track refactor updates
Added use of cache blob statter Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)master
							parent
							
								
									568df315ff
								
							
						
					
					
						commit
						296a8415b9
					
				| 
						 | 
				
			
			@ -1,159 +0,0 @@
 | 
			
		|||
package client
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"os"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type httpBlob struct {
 | 
			
		||||
	*repository
 | 
			
		||||
 | 
			
		||||
	desc distribution.Descriptor
 | 
			
		||||
 | 
			
		||||
	rc     io.ReadCloser // remote read closer
 | 
			
		||||
	brd    *bufio.Reader // internal buffered io
 | 
			
		||||
	offset int64
 | 
			
		||||
	err    error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hb *httpBlob) Read(p []byte) (n int, err error) {
 | 
			
		||||
	if hb.err != nil {
 | 
			
		||||
		return 0, hb.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rd, err := hb.reader()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n, err = rd.Read(p)
 | 
			
		||||
	hb.offset += int64(n)
 | 
			
		||||
 | 
			
		||||
	// Simulate io.EOF error if we reach filesize.
 | 
			
		||||
	if err == nil && hb.offset >= hb.desc.Length {
 | 
			
		||||
		err = io.EOF
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return n, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hb *httpBlob) Seek(offset int64, whence int) (int64, error) {
 | 
			
		||||
	if hb.err != nil {
 | 
			
		||||
		return 0, hb.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	newOffset := hb.offset
 | 
			
		||||
 | 
			
		||||
	switch whence {
 | 
			
		||||
	case os.SEEK_CUR:
 | 
			
		||||
		newOffset += int64(offset)
 | 
			
		||||
	case os.SEEK_END:
 | 
			
		||||
		newOffset = hb.desc.Length + int64(offset)
 | 
			
		||||
	case os.SEEK_SET:
 | 
			
		||||
		newOffset = int64(offset)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if newOffset < 0 {
 | 
			
		||||
		err = fmt.Errorf("cannot seek to negative position")
 | 
			
		||||
	} else {
 | 
			
		||||
		if hb.offset != newOffset {
 | 
			
		||||
			hb.reset()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// No problems, set the offset.
 | 
			
		||||
		hb.offset = newOffset
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return hb.offset, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hb *httpBlob) Close() error {
 | 
			
		||||
	if hb.err != nil {
 | 
			
		||||
		return hb.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// close and release reader chain
 | 
			
		||||
	if hb.rc != nil {
 | 
			
		||||
		hb.rc.Close()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	hb.rc = nil
 | 
			
		||||
	hb.brd = nil
 | 
			
		||||
 | 
			
		||||
	hb.err = fmt.Errorf("httpBlob: closed")
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hb *httpBlob) reset() {
 | 
			
		||||
	if hb.err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if hb.rc != nil {
 | 
			
		||||
		hb.rc.Close()
 | 
			
		||||
		hb.rc = nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hb *httpBlob) reader() (io.Reader, error) {
 | 
			
		||||
	if hb.err != nil {
 | 
			
		||||
		return nil, hb.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if hb.rc != nil {
 | 
			
		||||
		return hb.brd, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// If the offset is great than or equal to size, return a empty, noop reader.
 | 
			
		||||
	if hb.offset >= hb.desc.Length {
 | 
			
		||||
		return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	blobURL, err := hb.ub.BuildBlobURL(hb.name, hb.desc.Digest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	req, err := http.NewRequest("GET", blobURL, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if hb.offset > 0 {
 | 
			
		||||
		// TODO(stevvooe): Get this working correctly.
 | 
			
		||||
 | 
			
		||||
		// If we are at different offset, issue a range request from there.
 | 
			
		||||
		req.Header.Add("Range", fmt.Sprintf("1-"))
 | 
			
		||||
		context.GetLogger(hb.context).Infof("Range: %s", req.Header.Get("Range"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	resp, err := hb.client.Do(req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch {
 | 
			
		||||
	case resp.StatusCode == 200:
 | 
			
		||||
		hb.rc = resp.Body
 | 
			
		||||
	default:
 | 
			
		||||
		defer resp.Body.Close()
 | 
			
		||||
		return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if hb.brd == nil {
 | 
			
		||||
		hb.brd = bufio.NewReader(hb.rc)
 | 
			
		||||
	} else {
 | 
			
		||||
		hb.brd.Reset(hb.rc)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return hb.brd, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -151,7 +151,7 @@ func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descrip
 | 
			
		|||
	return hbu.repo.Blobs(ctx).Stat(ctx, desc.Digest)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hbu *httpBlobUpload) Rollback(ctx context.Context) error {
 | 
			
		||||
func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
 | 
			
		||||
	panic("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,164 @@
 | 
			
		|||
package client
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"os"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewHTTPReadSeeker(client *http.Client, url string, size int64) distribution.ReadSeekCloser {
 | 
			
		||||
	return &httpReadSeeker{
 | 
			
		||||
		client: client,
 | 
			
		||||
		url:    url,
 | 
			
		||||
		size:   size,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type httpReadSeeker struct {
 | 
			
		||||
	client *http.Client
 | 
			
		||||
	url    string
 | 
			
		||||
 | 
			
		||||
	size int64
 | 
			
		||||
 | 
			
		||||
	rc     io.ReadCloser // remote read closer
 | 
			
		||||
	brd    *bufio.Reader // internal buffered io
 | 
			
		||||
	offset int64
 | 
			
		||||
	err    error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
 | 
			
		||||
	if hrs.err != nil {
 | 
			
		||||
		return 0, hrs.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rd, err := hrs.reader()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n, err = rd.Read(p)
 | 
			
		||||
	hrs.offset += int64(n)
 | 
			
		||||
 | 
			
		||||
	// Simulate io.EOF error if we reach filesize.
 | 
			
		||||
	if err == nil && hrs.offset >= hrs.size {
 | 
			
		||||
		err = io.EOF
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return n, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
 | 
			
		||||
	if hrs.err != nil {
 | 
			
		||||
		return 0, hrs.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	newOffset := hrs.offset
 | 
			
		||||
 | 
			
		||||
	switch whence {
 | 
			
		||||
	case os.SEEK_CUR:
 | 
			
		||||
		newOffset += int64(offset)
 | 
			
		||||
	case os.SEEK_END:
 | 
			
		||||
		newOffset = hrs.size + int64(offset)
 | 
			
		||||
	case os.SEEK_SET:
 | 
			
		||||
		newOffset = int64(offset)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if newOffset < 0 {
 | 
			
		||||
		err = errors.New("cannot seek to negative position")
 | 
			
		||||
	} else {
 | 
			
		||||
		if hrs.offset != newOffset {
 | 
			
		||||
			hrs.reset()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// No problems, set the offset.
 | 
			
		||||
		hrs.offset = newOffset
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return hrs.offset, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hrs *httpReadSeeker) Close() error {
 | 
			
		||||
	if hrs.err != nil {
 | 
			
		||||
		return hrs.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// close and release reader chain
 | 
			
		||||
	if hrs.rc != nil {
 | 
			
		||||
		hrs.rc.Close()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	hrs.rc = nil
 | 
			
		||||
	hrs.brd = nil
 | 
			
		||||
 | 
			
		||||
	hrs.err = errors.New("httpLayer: closed")
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hrs *httpReadSeeker) reset() {
 | 
			
		||||
	if hrs.err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if hrs.rc != nil {
 | 
			
		||||
		hrs.rc.Close()
 | 
			
		||||
		hrs.rc = nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (hrs *httpReadSeeker) reader() (io.Reader, error) {
 | 
			
		||||
	if hrs.err != nil {
 | 
			
		||||
		return nil, hrs.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if hrs.rc != nil {
 | 
			
		||||
		return hrs.brd, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// If the offset is great than or equal to size, return a empty, noop reader.
 | 
			
		||||
	if hrs.offset >= hrs.size {
 | 
			
		||||
		return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	req, err := http.NewRequest("GET", hrs.url, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if hrs.offset > 0 {
 | 
			
		||||
		// TODO(stevvooe): Get this working correctly.
 | 
			
		||||
 | 
			
		||||
		// If we are at different offset, issue a range request from there.
 | 
			
		||||
		req.Header.Add("Range", "1-")
 | 
			
		||||
		// TODO: get context in here
 | 
			
		||||
		// context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	resp, err := hrs.client.Do(req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch {
 | 
			
		||||
	case resp.StatusCode == 200:
 | 
			
		||||
		hrs.rc = resp.Body
 | 
			
		||||
	default:
 | 
			
		||||
		defer resp.Body.Close()
 | 
			
		||||
		return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if hrs.brd == nil {
 | 
			
		||||
		hrs.brd = bufio.NewReader(hrs.rc)
 | 
			
		||||
	} else {
 | 
			
		||||
		hrs.brd.Reset(hrs.rc)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return hrs.brd, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -18,6 +18,7 @@ import (
 | 
			
		|||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/registry/api/v2"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/cache"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// NewRepository creates a new Repository for the given repository name and endpoint
 | 
			
		||||
| 
						 | 
				
			
			@ -56,9 +57,13 @@ func (r *repository) Name() string {
 | 
			
		|||
	return r.name
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *repository) Blobs(ctx context.Context) distribution.BlobService {
 | 
			
		||||
func (r *repository) Blobs(ctx context.Context) distribution.BlobStore {
 | 
			
		||||
	statter := &blobStatter{
 | 
			
		||||
		repository: r,
 | 
			
		||||
	}
 | 
			
		||||
	return &blobs{
 | 
			
		||||
		repository: r,
 | 
			
		||||
		statter:    cache.NewCachedBlobStatter(cache.NewInMemoryBlobDescriptorCacheProvider(), statter),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -232,6 +237,8 @@ func (ms *manifests) Delete(dgst digest.Digest) error {
 | 
			
		|||
 | 
			
		||||
type blobs struct {
 | 
			
		||||
	*repository
 | 
			
		||||
 | 
			
		||||
	statter distribution.BlobStatter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func sanitizeLocation(location, source string) (string, error) {
 | 
			
		||||
| 
						 | 
				
			
			@ -255,12 +262,17 @@ func sanitizeLocation(location, source string) (string, error) {
 | 
			
		|||
	return location, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
 | 
			
		||||
	return ls.statter.Stat(ctx, dgst)
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
 | 
			
		||||
	desc, err := ls.Stat(ctx, dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	reader, err := ls.Open(ctx, desc)
 | 
			
		||||
	reader, err := ls.Open(ctx, desc.Digest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -269,19 +281,26 @@ func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
 | 
			
		|||
	return ioutil.ReadAll(reader)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobs) Open(ctx context.Context, desc distribution.Descriptor) (distribution.ReadSeekCloser, error) {
 | 
			
		||||
	return &httpBlob{
 | 
			
		||||
		repository: ls.repository,
 | 
			
		||||
		desc:       desc,
 | 
			
		||||
	}, nil
 | 
			
		||||
func (ls *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
 | 
			
		||||
	stat, err := ls.statter.Stat(ctx, dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	blobURL, err := ls.ub.BuildBlobURL(ls.Name(), stat.Digest)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return NewHTTPReadSeeker(ls.repository.client, blobURL, stat.Length), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, desc distribution.Descriptor) error {
 | 
			
		||||
func (ls *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
 | 
			
		||||
	writer, err := ls.Writer(ctx)
 | 
			
		||||
	writer, err := ls.Create(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return distribution.Descriptor{}, err
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -303,7 +322,7 @@ func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
 | 
			
		|||
	return writer.Commit(ctx, desc)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobs) Writer(ctx context.Context) (distribution.BlobWriter, error) {
 | 
			
		||||
func (ls *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
 | 
			
		||||
	u, err := ls.ub.BuildBlobUploadURL(ls.name)
 | 
			
		||||
 | 
			
		||||
	resp, err := ls.client.Post(u, "", nil)
 | 
			
		||||
| 
						 | 
				
			
			@ -337,7 +356,11 @@ func (ls *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
 | 
			
		|||
	panic("not implemented")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
 | 
			
		||||
type blobStatter struct {
 | 
			
		||||
	*repository
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ls *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
 | 
			
		||||
	u, err := ls.ub.BuildBlobURL(ls.name, dgst)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return distribution.Descriptor{}, err
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -237,7 +237,7 @@ func TestBlobUploadChunked(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	l := r.Blobs(ctx)
 | 
			
		||||
 | 
			
		||||
	upload, err := l.Writer(ctx)
 | 
			
		||||
	upload, err := l.Create(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -348,7 +348,7 @@ func TestBlobUploadMonolithic(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	l := r.Blobs(ctx)
 | 
			
		||||
 | 
			
		||||
	upload, err := l.Writer(ctx)
 | 
			
		||||
	upload, err := l.Create(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue