152 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Go
		
	
	
package client
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
 | 
						|
	log "github.com/Sirupsen/logrus"
 | 
						|
 | 
						|
	"github.com/docker/distribution/manifest"
 | 
						|
)
 | 
						|
 | 
						|
// simultaneousLayerPullWindow is the size of the parallel layer pull window.
 | 
						|
// A layer may not be pulled until the layer preceeding it by the length of the
 | 
						|
// pull window has been successfully pulled.
 | 
						|
const simultaneousLayerPullWindow = 4
 | 
						|
 | 
						|
// Pull implements a client pull workflow for the image defined by the given
 | 
						|
// name and tag pair, using the given ObjectStore for local manifest and layer
 | 
						|
// storage
 | 
						|
func Pull(c Client, objectStore ObjectStore, name, tag string) error {
 | 
						|
	manifest, err := c.GetImageManifest(name, tag)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	log.WithField("manifest", manifest).Info("Pulled manifest")
 | 
						|
 | 
						|
	if len(manifest.FSLayers) != len(manifest.History) {
 | 
						|
		return fmt.Errorf("Length of history not equal to number of layers")
 | 
						|
	}
 | 
						|
	if len(manifest.FSLayers) == 0 {
 | 
						|
		return fmt.Errorf("Image has no layers")
 | 
						|
	}
 | 
						|
 | 
						|
	errChans := make([]chan error, len(manifest.FSLayers))
 | 
						|
	for i := range manifest.FSLayers {
 | 
						|
		errChans[i] = make(chan error)
 | 
						|
	}
 | 
						|
 | 
						|
	// To avoid leak of goroutines we must notify
 | 
						|
	// pullLayer goroutines about a cancelation,
 | 
						|
	// otherwise they will lock forever.
 | 
						|
	cancelCh := make(chan struct{})
 | 
						|
 | 
						|
	// Iterate over each layer in the manifest, simultaneously pulling no more
 | 
						|
	// than simultaneousLayerPullWindow layers at a time. If an error is
 | 
						|
	// received from a layer pull, we abort the push.
 | 
						|
	for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ {
 | 
						|
		dependentLayer := i - simultaneousLayerPullWindow
 | 
						|
		if dependentLayer >= 0 {
 | 
						|
			err := <-errChans[dependentLayer]
 | 
						|
			if err != nil {
 | 
						|
				log.WithField("error", err).Warn("Pull aborted")
 | 
						|
				close(cancelCh)
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if i < len(manifest.FSLayers) {
 | 
						|
			go func(i int) {
 | 
						|
				select {
 | 
						|
				case errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i]):
 | 
						|
				case <-cancelCh: // no chance to recv until cancelCh's closed
 | 
						|
				}
 | 
						|
			}(i)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	err = objectStore.WriteManifest(name, tag, manifest)
 | 
						|
	if err != nil {
 | 
						|
		log.WithFields(log.Fields{
 | 
						|
			"error":    err,
 | 
						|
			"manifest": manifest,
 | 
						|
		}).Warn("Unable to write image manifest")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
 | 
						|
	log.WithField("layer", fsLayer).Info("Pulling layer")
 | 
						|
 | 
						|
	layer, err := objectStore.Layer(fsLayer.BlobSum)
 | 
						|
	if err != nil {
 | 
						|
		log.WithFields(log.Fields{
 | 
						|
			"error": err,
 | 
						|
			"layer": fsLayer,
 | 
						|
		}).Warn("Unable to write local layer")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	layerWriter, err := layer.Writer()
 | 
						|
	if err == ErrLayerAlreadyExists {
 | 
						|
		log.WithField("layer", fsLayer).Info("Layer already exists")
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if err == ErrLayerLocked {
 | 
						|
		log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
 | 
						|
		layer.Wait()
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		log.WithFields(log.Fields{
 | 
						|
			"error": err,
 | 
						|
			"layer": fsLayer,
 | 
						|
		}).Warn("Unable to write local layer")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer layerWriter.Close()
 | 
						|
 | 
						|
	if layerWriter.CurrentSize() > 0 {
 | 
						|
		log.WithFields(log.Fields{
 | 
						|
			"layer":       fsLayer,
 | 
						|
			"currentSize": layerWriter.CurrentSize(),
 | 
						|
			"size":        layerWriter.Size(),
 | 
						|
		}).Info("Layer partially downloaded, resuming")
 | 
						|
	}
 | 
						|
 | 
						|
	layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize())
 | 
						|
	if err != nil {
 | 
						|
		log.WithFields(log.Fields{
 | 
						|
			"error": err,
 | 
						|
			"layer": fsLayer,
 | 
						|
		}).Warn("Unable to download layer")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer layerReader.Close()
 | 
						|
 | 
						|
	layerWriter.SetSize(layerWriter.CurrentSize() + length)
 | 
						|
 | 
						|
	_, err = io.Copy(layerWriter, layerReader)
 | 
						|
	if err != nil {
 | 
						|
		log.WithFields(log.Fields{
 | 
						|
			"error": err,
 | 
						|
			"layer": fsLayer,
 | 
						|
		}).Warn("Unable to download layer")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if layerWriter.CurrentSize() != layerWriter.Size() {
 | 
						|
		log.WithFields(log.Fields{
 | 
						|
			"size":        layerWriter.Size(),
 | 
						|
			"currentSize": layerWriter.CurrentSize(),
 | 
						|
			"layer":       fsLayer,
 | 
						|
		}).Warn("Layer invalid size")
 | 
						|
		return fmt.Errorf(
 | 
						|
			"Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d",
 | 
						|
			fsLayer, layerWriter.Size(), layerWriter.CurrentSize(),
 | 
						|
		)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |