185 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			185 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
| package base
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"reflect"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 
 | |
| 	storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
 | |
| )
 | |
| 
 | |
| type regulator struct {
 | |
| 	storagedriver.StorageDriver
 | |
| 	*sync.Cond
 | |
| 
 | |
| 	available uint64
 | |
| }
 | |
| 
 | |
| // GetLimitFromParameter takes an interface type as decoded from the YAML
 | |
| // configuration and returns a uint64 representing the maximum number of
 | |
| // concurrent calls given a minimum limit and default.
 | |
| //
 | |
| // If the parameter supplied is of an invalid type this returns an error.
 | |
| func GetLimitFromParameter(param interface{}, min, def uint64) (uint64, error) {
 | |
| 	limit := def
 | |
| 
 | |
| 	switch v := param.(type) {
 | |
| 	case string:
 | |
| 		var err error
 | |
| 		if limit, err = strconv.ParseUint(v, 0, 64); err != nil {
 | |
| 			return limit, fmt.Errorf("parameter must be an integer, '%v' invalid", param)
 | |
| 		}
 | |
| 	case uint64:
 | |
| 		limit = v
 | |
| 	case int, int32, int64:
 | |
| 		val := reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Int()
 | |
| 		// if param is negative casting to uint64 will wrap around and
 | |
| 		// give you the hugest thread limit ever. Let's be sensible, here
 | |
| 		if val > 0 {
 | |
| 			limit = uint64(val)
 | |
| 		} else {
 | |
| 			limit = min
 | |
| 		}
 | |
| 	case uint, uint32:
 | |
| 		limit = reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Uint()
 | |
| 	case nil:
 | |
| 		// use the default
 | |
| 	default:
 | |
| 		return 0, fmt.Errorf("invalid value '%#v'", param)
 | |
| 	}
 | |
| 
 | |
| 	if limit < min {
 | |
| 		return min, nil
 | |
| 	}
 | |
| 
 | |
| 	return limit, nil
 | |
| }
 | |
| 
 | |
| // NewRegulator wraps the given driver and is used to regulate concurrent calls
 | |
| // to the given storage driver to a maximum of the given limit. This is useful
 | |
| // for storage drivers that would otherwise create an unbounded number of OS
 | |
| // threads if allowed to be called unregulated.
 | |
| func NewRegulator(driver storagedriver.StorageDriver, limit uint64) storagedriver.StorageDriver {
 | |
| 	return ®ulator{
 | |
| 		StorageDriver: driver,
 | |
| 		Cond:          sync.NewCond(&sync.Mutex{}),
 | |
| 		available:     limit,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *regulator) enter() {
 | |
| 	r.L.Lock()
 | |
| 	for r.available == 0 {
 | |
| 		r.Wait()
 | |
| 	}
 | |
| 	r.available--
 | |
| 	r.L.Unlock()
 | |
| }
 | |
| 
 | |
| func (r *regulator) exit() {
 | |
| 	r.L.Lock()
 | |
| 	r.Signal()
 | |
| 	r.available++
 | |
| 	r.L.Unlock()
 | |
| }
 | |
| 
 | |
| // Name returns the human-readable "name" of the driver, useful in error
 | |
| // messages and logging. By convention, this will just be the registration
 | |
| // name, but drivers may provide other information here.
 | |
| func (r *regulator) Name() string {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.Name()
 | |
| }
 | |
| 
 | |
| // GetContent retrieves the content stored at "path" as a []byte.
 | |
| // This should primarily be used for small objects.
 | |
| func (r *regulator) GetContent(ctx context.Context, path string) ([]byte, error) {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.GetContent(ctx, path)
 | |
| }
 | |
| 
 | |
| // PutContent stores the []byte content at a location designated by "path".
 | |
| // This should primarily be used for small objects.
 | |
| func (r *regulator) PutContent(ctx context.Context, path string, content []byte) error {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.PutContent(ctx, path, content)
 | |
| }
 | |
| 
 | |
| // Reader retrieves an io.ReadCloser for the content stored at "path"
 | |
| // with a given byte offset.
 | |
| // May be used to resume reading a stream by providing a nonzero offset.
 | |
| func (r *regulator) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.Reader(ctx, path, offset)
 | |
| }
 | |
| 
 | |
| // Writer stores the contents of the provided io.ReadCloser at a
 | |
| // location designated by the given path.
 | |
| // May be used to resume writing a stream by providing a nonzero offset.
 | |
| // The offset must be no larger than the CurrentSize for this path.
 | |
| func (r *regulator) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.Writer(ctx, path, append)
 | |
| }
 | |
| 
 | |
| // Stat retrieves the FileInfo for the given path, including the current
 | |
| // size in bytes and the creation time.
 | |
| func (r *regulator) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.Stat(ctx, path)
 | |
| }
 | |
| 
 | |
| // List returns a list of the objects that are direct descendants of the
 | |
| // given path.
 | |
| func (r *regulator) List(ctx context.Context, path string) ([]string, error) {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.List(ctx, path)
 | |
| }
 | |
| 
 | |
| // Move moves an object stored at sourcePath to destPath, removing the
 | |
| // original object.
 | |
| // Note: This may be no more efficient than a copy followed by a delete for
 | |
| // many implementations.
 | |
| func (r *regulator) Move(ctx context.Context, sourcePath string, destPath string) error {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.Move(ctx, sourcePath, destPath)
 | |
| }
 | |
| 
 | |
| // Delete recursively deletes all objects stored at "path" and its subpaths.
 | |
| func (r *regulator) Delete(ctx context.Context, path string) error {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.Delete(ctx, path)
 | |
| }
 | |
| 
 | |
| // URLFor returns a URL which may be used to retrieve the content stored at
 | |
| // the given path, possibly using the given options.
 | |
| // May return an ErrUnsupportedMethod in certain StorageDriver
 | |
| // implementations.
 | |
| func (r *regulator) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
 | |
| 	r.enter()
 | |
| 	defer r.exit()
 | |
| 
 | |
| 	return r.StorageDriver.URLFor(ctx, path, options)
 | |
| }
 |