Parallelize signature fetch in signature store
To avoid compounded round trips leading to slow retrieval of manifests with a large number of signatures, the fetch of signatures has been parallelized. This simply spawns a goroutine for each path, coordinated with a sync.WaitGroup. Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									e56124d343
								
							
						
					
					
						commit
						def60f3426
					
				| 
						 | 
				
			
			@ -2,8 +2,10 @@ package storage
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"path"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution"
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/digest"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -33,18 +35,42 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
 | 
			
		|||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var signatures [][]byte
 | 
			
		||||
	for _, sigPath := range signaturePaths {
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	signatures := make([][]byte, len(signaturePaths)) // make space for everything
 | 
			
		||||
	errCh := make(chan error, 1)                      // buffered chan so one proceeds
 | 
			
		||||
	for i, sigPath := range signaturePaths {
 | 
			
		||||
		// Append the link portion
 | 
			
		||||
		sigPath = path.Join(sigPath, "link")
 | 
			
		||||
 | 
			
		||||
		// TODO(stevvooe): These fetches should be parallelized for performance.
 | 
			
		||||
		p, err := s.blobStore.linked(sigPath)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func(idx int, sigPath string) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			context.GetLogger(s.ctx).
 | 
			
		||||
				Debugf("fetching signature from %q", sigPath)
 | 
			
		||||
			p, err := s.blobStore.linked(sigPath)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				context.GetLogger(s.ctx).
 | 
			
		||||
					Errorf("error fetching signature from %q: %v", sigPath, err)
 | 
			
		||||
 | 
			
		||||
		signatures = append(signatures, p)
 | 
			
		||||
				// try to send an error, if it hasn't already been sent.
 | 
			
		||||
				select {
 | 
			
		||||
				case errCh <- err:
 | 
			
		||||
				default:
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			signatures[idx] = p
 | 
			
		||||
		}(i, sigPath)
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case err := <-errCh:
 | 
			
		||||
		// just return the first error, similar to single threaded code.
 | 
			
		||||
		return nil, err
 | 
			
		||||
	default:
 | 
			
		||||
		// pass
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return signatures, nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue