Parallelize signature fetch in signature store
To avoid compounded round trips leading to slow retrieval of manifests with a large number of signatures, the fetch of signatures has been parallelized. This simply spawns a goroutine for each path, coordinated with a sync.WaitGroup. Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									0a2affa79f
								
							
						
					
					
						commit
						1eab4b79bc
					
				| 
						 | 
					@ -2,8 +2,10 @@ package storage
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"path"
 | 
						"path"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/docker/distribution"
 | 
						"github.com/docker/distribution"
 | 
				
			||||||
 | 
						"github.com/docker/distribution/context"
 | 
				
			||||||
	"github.com/docker/distribution/digest"
 | 
						"github.com/docker/distribution/digest"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -33,18 +35,42 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var signatures [][]byte
 | 
						var wg sync.WaitGroup
 | 
				
			||||||
	for _, sigPath := range signaturePaths {
 | 
						signatures := make([][]byte, len(signaturePaths)) // make space for everything
 | 
				
			||||||
 | 
						errCh := make(chan error, 1)                      // buffered chan so one proceeds
 | 
				
			||||||
 | 
						for i, sigPath := range signaturePaths {
 | 
				
			||||||
		// Append the link portion
 | 
							// Append the link portion
 | 
				
			||||||
		sigPath = path.Join(sigPath, "link")
 | 
							sigPath = path.Join(sigPath, "link")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// TODO(stevvooe): These fetches should be parallelized for performance.
 | 
							wg.Add(1)
 | 
				
			||||||
 | 
							go func(idx int, sigPath string) {
 | 
				
			||||||
 | 
								defer wg.Done()
 | 
				
			||||||
 | 
								context.GetLogger(s.ctx).
 | 
				
			||||||
 | 
									Debugf("fetching signature from %q", sigPath)
 | 
				
			||||||
			p, err := s.blobStore.linked(sigPath)
 | 
								p, err := s.blobStore.linked(sigPath)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
			return nil, err
 | 
									context.GetLogger(s.ctx).
 | 
				
			||||||
 | 
										Errorf("error fetching signature from %q: %v", sigPath, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// try to send an error, if it hasn't already been sent.
 | 
				
			||||||
 | 
									select {
 | 
				
			||||||
 | 
									case errCh <- err:
 | 
				
			||||||
 | 
									default:
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		signatures = append(signatures, p)
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								signatures[idx] = p
 | 
				
			||||||
 | 
							}(i, sigPath)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						wg.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case err := <-errCh:
 | 
				
			||||||
 | 
							// just return the first error, similar to single threaded code.
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							// pass
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return signatures, nil
 | 
						return signatures, nil
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue