145 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			145 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
| // +build !noresumabledigest
 | |
| 
 | |
| package storage
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding"
 | |
| 	"fmt"
 | |
| 	"hash"
 | |
| 	"path"
 | |
| 	"strconv"
 | |
| 
 | |
| 	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | |
| 	"github.com/sirupsen/logrus"
 | |
| )
 | |
| 
 | |
| // resumeDigest attempts to restore the state of the internal hash function
 | |
| // by loading the most recent saved hash state equal to the current size of the blob.
 | |
| func (bw *blobWriter) resumeDigest(ctx context.Context) error {
 | |
| 	if !bw.resumableDigestEnabled {
 | |
| 		return errResumableDigestNotAvailable
 | |
| 	}
 | |
| 
 | |
| 	h, ok := bw.digester.Hash().(encoding.BinaryUnmarshaler)
 | |
| 	if !ok {
 | |
| 		return errResumableDigestNotAvailable
 | |
| 	}
 | |
| 
 | |
| 	offset := bw.fileWriter.Size()
 | |
| 	if offset == bw.written {
 | |
| 		// State of digester is already at the requested offset.
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// List hash states from storage backend.
 | |
| 	var hashStateMatch hashStateEntry
 | |
| 	hashStates, err := bw.getStoredHashStates(ctx)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
 | |
| 	}
 | |
| 
 | |
| 	// Find the highest stored hashState with offset equal to
 | |
| 	// the requested offset.
 | |
| 	for _, hashState := range hashStates {
 | |
| 		if hashState.offset == offset {
 | |
| 			hashStateMatch = hashState
 | |
| 			break // Found an exact offset match.
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if hashStateMatch.offset == 0 {
 | |
| 		// No need to load any state, just reset the hasher.
 | |
| 		h.(hash.Hash).Reset()
 | |
| 	} else {
 | |
| 		storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if err = h.UnmarshalBinary(storedState); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		bw.written = hashStateMatch.offset
 | |
| 	}
 | |
| 
 | |
| 	// Mind the gap.
 | |
| 	if gapLen := offset - bw.written; gapLen > 0 {
 | |
| 		return errResumableDigestNotAvailable
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type hashStateEntry struct {
 | |
| 	offset int64
 | |
| 	path   string
 | |
| }
 | |
| 
 | |
| // getStoredHashStates returns a slice of hashStateEntries for this upload.
 | |
| func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
 | |
| 	uploadHashStatePathPrefix, err := pathFor(uploadHashStatePathSpec{
 | |
| 		name: bw.blobStore.repository.Named().String(),
 | |
| 		id:   bw.id,
 | |
| 		alg:  bw.digester.Digest().Algorithm(),
 | |
| 		list: true,
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
 | |
| 	if err != nil {
 | |
| 		if _, ok := err.(storagedriver.PathNotFoundError); !ok {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		// Treat PathNotFoundError as no entries.
 | |
| 		paths = nil
 | |
| 	}
 | |
| 
 | |
| 	hashStateEntries := make([]hashStateEntry, 0, len(paths))
 | |
| 
 | |
| 	for _, p := range paths {
 | |
| 		pathSuffix := path.Base(p)
 | |
| 		// The suffix should be the offset.
 | |
| 		offset, err := strconv.ParseInt(pathSuffix, 0, 64)
 | |
| 		if err != nil {
 | |
| 			logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
 | |
| 		}
 | |
| 
 | |
| 		hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
 | |
| 	}
 | |
| 
 | |
| 	return hashStateEntries, nil
 | |
| }
 | |
| 
 | |
| func (bw *blobWriter) storeHashState(ctx context.Context) error {
 | |
| 	if !bw.resumableDigestEnabled {
 | |
| 		return errResumableDigestNotAvailable
 | |
| 	}
 | |
| 
 | |
| 	h, ok := bw.digester.Hash().(encoding.BinaryMarshaler)
 | |
| 	if !ok {
 | |
| 		return errResumableDigestNotAvailable
 | |
| 	}
 | |
| 
 | |
| 	state, err := h.MarshalBinary()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	uploadHashStatePath, err := pathFor(uploadHashStatePathSpec{
 | |
| 		name:   bw.blobStore.repository.Named().String(),
 | |
| 		id:     bw.id,
 | |
| 		alg:    bw.digester.Digest().Algorithm(),
 | |
| 		offset: bw.written,
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return bw.driver.PutContent(ctx, uploadHashStatePath, state)
 | |
| }
 |