Merge pull request #1394 from RichardScothern/invalidate-bdc
Invalidate the blob store descriptor cachemaster
						commit
						ccd11e4434
					
				| 
						 | 
					@ -21,6 +21,7 @@ type proxyBlobStore struct {
 | 
				
			||||||
	localStore     distribution.BlobStore
 | 
						localStore     distribution.BlobStore
 | 
				
			||||||
	remoteStore    distribution.BlobService
 | 
						remoteStore    distribution.BlobService
 | 
				
			||||||
	scheduler      *scheduler.TTLExpirationScheduler
 | 
						scheduler      *scheduler.TTLExpirationScheduler
 | 
				
			||||||
 | 
						repositoryName reference.Named
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var _ distribution.BlobStore = &proxyBlobStore{}
 | 
					var _ distribution.BlobStore = &proxyBlobStore{}
 | 
				
			||||||
| 
						 | 
					@ -134,7 +135,14 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
 | 
				
			||||||
		if err := pbs.storeLocal(ctx, dgst); err != nil {
 | 
							if err := pbs.storeLocal(ctx, dgst); err != nil {
 | 
				
			||||||
			context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
 | 
								context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		pbs.scheduler.AddBlob(dgst, repositoryTTL)
 | 
					
 | 
				
			||||||
 | 
							blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								context.GetLogger(ctx).Errorf("Error creating reference: %s", err)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							pbs.scheduler.AddBlob(blobRef, repositoryTTL)
 | 
				
			||||||
	}(dgst)
 | 
						}(dgst)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err = pbs.copyContent(ctx, dgst, w)
 | 
						_, err = pbs.copyContent(ctx, dgst, w)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -164,6 +164,7 @@ func makeTestEnv(t *testing.T, name string) *testEnv {
 | 
				
			||||||
	s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
 | 
						s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	proxyBlobStore := proxyBlobStore{
 | 
						proxyBlobStore := proxyBlobStore{
 | 
				
			||||||
 | 
							repositoryName: nameRef,
 | 
				
			||||||
		remoteStore:    truthBlobs,
 | 
							remoteStore:    truthBlobs,
 | 
				
			||||||
		localStore:     localBlobs,
 | 
							localStore:     localBlobs,
 | 
				
			||||||
		scheduler:      s,
 | 
							scheduler:      s,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -62,11 +62,17 @@ func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, optio
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Schedule the repo for removal
 | 
							// Schedule the manifest blob for removal
 | 
				
			||||||
		pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL)
 | 
							repoBlob, err := reference.WithDigest(pms.repositoryName, dgst)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								context.GetLogger(ctx).Errorf("Error creating reference: %s", err)
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							pms.scheduler.AddManifest(repoBlob, repositoryTTL)
 | 
				
			||||||
		// Ensure the manifest blob is cleaned up
 | 
							// Ensure the manifest blob is cleaned up
 | 
				
			||||||
		pms.scheduler.AddBlob(dgst, repositoryTTL)
 | 
							//pms.scheduler.AddBlob(blobRef, repositoryTTL)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return manifest, err
 | 
						return manifest, err
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -119,6 +119,7 @@ func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestE
 | 
				
			||||||
			localManifests:  localManifests,
 | 
								localManifests:  localManifests,
 | 
				
			||||||
			remoteManifests: truthManifests,
 | 
								remoteManifests: truthManifests,
 | 
				
			||||||
			scheduler:       s,
 | 
								scheduler:       s,
 | 
				
			||||||
 | 
								repositoryName:  nameRef,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -4,6 +4,7 @@ import (
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"net/url"
 | 
						"net/url"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"github.com/docker/distribution"
 | 
						"github.com/docker/distribution"
 | 
				
			||||||
	"github.com/docker/distribution/configuration"
 | 
						"github.com/docker/distribution/configuration"
 | 
				
			||||||
	"github.com/docker/distribution/context"
 | 
						"github.com/docker/distribution/context"
 | 
				
			||||||
| 
						 | 
					@ -35,13 +36,56 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	v := storage.NewVacuum(ctx, driver)
 | 
						v := storage.NewVacuum(ctx, driver)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	s := scheduler.New(ctx, driver, "/scheduler-state.json")
 | 
						s := scheduler.New(ctx, driver, "/scheduler-state.json")
 | 
				
			||||||
	s.OnBlobExpire(func(digest string) error {
 | 
						s.OnBlobExpire(func(ref reference.Reference) error {
 | 
				
			||||||
		return v.RemoveBlob(digest)
 | 
							var r reference.Canonical
 | 
				
			||||||
 | 
							var ok bool
 | 
				
			||||||
 | 
							if r, ok = ref.(reference.Canonical); !ok {
 | 
				
			||||||
 | 
								return fmt.Errorf("unexpected reference type : %T", ref)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							repo, err := registry.Repository(ctx, r)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							blobs := repo.Blobs(ctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Clear the repository reference and descriptor caches
 | 
				
			||||||
 | 
							err = blobs.Delete(ctx, r.Digest())
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							err = v.RemoveBlob(r.Digest().String())
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	s.OnManifestExpire(func(repoName string) error {
 | 
					
 | 
				
			||||||
		return v.RemoveRepository(repoName)
 | 
						s.OnManifestExpire(func(ref reference.Reference) error {
 | 
				
			||||||
 | 
							var r reference.Canonical
 | 
				
			||||||
 | 
							var ok bool
 | 
				
			||||||
 | 
							if r, ok = ref.(reference.Canonical); !ok {
 | 
				
			||||||
 | 
								return fmt.Errorf("unexpected reference type : %T", ref)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							repo, err := registry.Repository(ctx, r)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							manifests, err := repo.Manifests(ctx)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							err = manifests.Delete(ctx, r.Digest())
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = s.Start()
 | 
						err = s.Start()
 | 
				
			||||||
| 
						 | 
					@ -100,8 +144,9 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named
 | 
				
			||||||
			localStore:     localRepo.Blobs(ctx),
 | 
								localStore:     localRepo.Blobs(ctx),
 | 
				
			||||||
			remoteStore:    remoteRepo.Blobs(ctx),
 | 
								remoteStore:    remoteRepo.Blobs(ctx),
 | 
				
			||||||
			scheduler:      pr.scheduler,
 | 
								scheduler:      pr.scheduler,
 | 
				
			||||||
 | 
								repositoryName: name,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		manifests: proxyManifestStore{
 | 
							manifests: &proxyManifestStore{
 | 
				
			||||||
			repositoryName:  name,
 | 
								repositoryName:  name,
 | 
				
			||||||
			localManifests:  localManifests, // Options?
 | 
								localManifests:  localManifests, // Options?
 | 
				
			||||||
			remoteManifests: remoteManifests,
 | 
								remoteManifests: remoteManifests,
 | 
				
			||||||
| 
						 | 
					@ -109,7 +154,7 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named
 | 
				
			||||||
			scheduler:       pr.scheduler,
 | 
								scheduler:       pr.scheduler,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		name: name,
 | 
							name: name,
 | 
				
			||||||
		tags: proxyTagService{
 | 
							tags: &proxyTagService{
 | 
				
			||||||
			localTags:  localRepo.Tags(ctx),
 | 
								localTags:  localRepo.Tags(ctx),
 | 
				
			||||||
			remoteTags: remoteRepo.Tags(ctx),
 | 
								remoteTags: remoteRepo.Tags(ctx),
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -7,13 +7,12 @@ import (
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/docker/distribution/context"
 | 
						"github.com/docker/distribution/context"
 | 
				
			||||||
	"github.com/docker/distribution/digest"
 | 
					 | 
				
			||||||
	"github.com/docker/distribution/reference"
 | 
						"github.com/docker/distribution/reference"
 | 
				
			||||||
	"github.com/docker/distribution/registry/storage/driver"
 | 
						"github.com/docker/distribution/registry/storage/driver"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// onTTLExpiryFunc is called when a repository's TTL expires
 | 
					// onTTLExpiryFunc is called when a repository's TTL expires
 | 
				
			||||||
type expiryFunc func(string) error
 | 
					type expiryFunc func(reference.Reference) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	entryTypeBlob = iota
 | 
						entryTypeBlob = iota
 | 
				
			||||||
| 
						 | 
					@ -82,19 +81,20 @@ func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// AddBlob schedules a blob cleanup after ttl expires
 | 
					// AddBlob schedules a blob cleanup after ttl expires
 | 
				
			||||||
func (ttles *TTLExpirationScheduler) AddBlob(dgst digest.Digest, ttl time.Duration) error {
 | 
					func (ttles *TTLExpirationScheduler) AddBlob(blobRef reference.Canonical, ttl time.Duration) error {
 | 
				
			||||||
	ttles.Lock()
 | 
						ttles.Lock()
 | 
				
			||||||
	defer ttles.Unlock()
 | 
						defer ttles.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if ttles.stopped {
 | 
						if ttles.stopped {
 | 
				
			||||||
		return fmt.Errorf("scheduler not started")
 | 
							return fmt.Errorf("scheduler not started")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ttles.add(dgst.String(), ttl, entryTypeBlob)
 | 
					
 | 
				
			||||||
 | 
						ttles.add(blobRef, ttl, entryTypeBlob)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// AddManifest schedules a manifest cleanup after ttl expires
 | 
					// AddManifest schedules a manifest cleanup after ttl expires
 | 
				
			||||||
func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl time.Duration) error {
 | 
					func (ttles *TTLExpirationScheduler) AddManifest(manifestRef reference.Canonical, ttl time.Duration) error {
 | 
				
			||||||
	ttles.Lock()
 | 
						ttles.Lock()
 | 
				
			||||||
	defer ttles.Unlock()
 | 
						defer ttles.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -102,7 +102,7 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName reference.Named, ttl t
 | 
				
			||||||
		return fmt.Errorf("scheduler not started")
 | 
							return fmt.Errorf("scheduler not started")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ttles.add(repoName.Name(), ttl, entryTypeManifest)
 | 
						ttles.add(manifestRef, ttl, entryTypeManifest)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -156,17 +156,17 @@ func (ttles *TTLExpirationScheduler) Start() error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
 | 
					func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) {
 | 
				
			||||||
	entry := &schedulerEntry{
 | 
						entry := &schedulerEntry{
 | 
				
			||||||
		Key:       key,
 | 
							Key:       r.String(),
 | 
				
			||||||
		Expiry:    time.Now().Add(ttl),
 | 
							Expiry:    time.Now().Add(ttl),
 | 
				
			||||||
		EntryType: eType,
 | 
							EntryType: eType,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
 | 
						context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
 | 
				
			||||||
	if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil {
 | 
						if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil {
 | 
				
			||||||
		oldEntry.timer.Stop()
 | 
							oldEntry.timer.Stop()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ttles.entries[key] = entry
 | 
						ttles.entries[entry.Key] = entry
 | 
				
			||||||
	entry.timer = ttles.startTimer(entry, ttl)
 | 
						entry.timer = ttles.startTimer(entry, ttl)
 | 
				
			||||||
	ttles.indexDirty = true
 | 
						ttles.indexDirty = true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -184,14 +184,19 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.
 | 
				
			||||||
		case entryTypeManifest:
 | 
							case entryTypeManifest:
 | 
				
			||||||
			f = ttles.onManifestExpire
 | 
								f = ttles.onManifestExpire
 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
			f = func(repoName string) error {
 | 
								f = func(reference.Reference) error {
 | 
				
			||||||
				return fmt.Errorf("Unexpected scheduler entry type")
 | 
									return fmt.Errorf("scheduler entry type")
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err := f(entry.Key); err != nil {
 | 
							ref, err := reference.Parse(entry.Key)
 | 
				
			||||||
 | 
							if err == nil {
 | 
				
			||||||
 | 
								if err := f(ref); err != nil {
 | 
				
			||||||
				context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
 | 
									context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								context.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		delete(ttles.entries, entry.Key)
 | 
							delete(ttles.entries, entry.Key)
 | 
				
			||||||
		ttles.indexDirty = true
 | 
							ttles.indexDirty = true
 | 
				
			||||||
| 
						 | 
					@ -249,6 +254,5 @@ func (ttles *TTLExpirationScheduler) readState() error {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -6,28 +6,49 @@ import (
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/docker/distribution/context"
 | 
						"github.com/docker/distribution/context"
 | 
				
			||||||
 | 
						"github.com/docker/distribution/reference"
 | 
				
			||||||
	"github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
						"github.com/docker/distribution/registry/storage/driver/inmemory"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) {
 | 
				
			||||||
 | 
						ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("could not parse reference: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ref2, err := reference.Parse("testrepo@sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("could not parse reference: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ref3, err := reference.Parse("testrepo@sha256:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("could not parse reference: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return ref1, ref2, ref3
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestSchedule(t *testing.T) {
 | 
					func TestSchedule(t *testing.T) {
 | 
				
			||||||
 | 
						ref1, ref2, ref3 := testRefs(t)
 | 
				
			||||||
	timeUnit := time.Millisecond
 | 
						timeUnit := time.Millisecond
 | 
				
			||||||
	remainingRepos := map[string]bool{
 | 
						remainingRepos := map[string]bool{
 | 
				
			||||||
		"testBlob1": true,
 | 
							ref1.String(): true,
 | 
				
			||||||
		"testBlob2": true,
 | 
							ref2.String(): true,
 | 
				
			||||||
		"ch00":      true,
 | 
							ref3.String(): true,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s := New(context.Background(), inmemory.New(), "/ttl")
 | 
						s := New(context.Background(), inmemory.New(), "/ttl")
 | 
				
			||||||
	deleteFunc := func(repoName string) error {
 | 
						deleteFunc := func(repoName reference.Reference) error {
 | 
				
			||||||
		if len(remainingRepos) == 0 {
 | 
							if len(remainingRepos) == 0 {
 | 
				
			||||||
			t.Fatalf("Incorrect expiry count")
 | 
								t.Fatalf("Incorrect expiry count")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		_, ok := remainingRepos[repoName]
 | 
							_, ok := remainingRepos[repoName.String()]
 | 
				
			||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
			t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
 | 
								t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		t.Log("removing", repoName)
 | 
							t.Log("removing", repoName)
 | 
				
			||||||
		delete(remainingRepos, repoName)
 | 
							delete(remainingRepos, repoName.String())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -37,11 +58,11 @@ func TestSchedule(t *testing.T) {
 | 
				
			||||||
		t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
 | 
							t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s.add("testBlob1", 3*timeUnit, entryTypeBlob)
 | 
						s.add(ref1, 3*timeUnit, entryTypeBlob)
 | 
				
			||||||
	s.add("testBlob2", 1*timeUnit, entryTypeBlob)
 | 
						s.add(ref2, 1*timeUnit, entryTypeBlob)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	func() {
 | 
						func() {
 | 
				
			||||||
		s.add("ch00", 1*timeUnit, entryTypeBlob)
 | 
							s.add(ref3, 1*timeUnit, entryTypeBlob)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -53,33 +74,34 @@ func TestSchedule(t *testing.T) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestRestoreOld(t *testing.T) {
 | 
					func TestRestoreOld(t *testing.T) {
 | 
				
			||||||
 | 
						ref1, ref2, _ := testRefs(t)
 | 
				
			||||||
	remainingRepos := map[string]bool{
 | 
						remainingRepos := map[string]bool{
 | 
				
			||||||
		"testBlob1": true,
 | 
							ref1.String(): true,
 | 
				
			||||||
		"oldRepo":   true,
 | 
							ref2.String(): true,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	deleteFunc := func(repoName string) error {
 | 
						deleteFunc := func(r reference.Reference) error {
 | 
				
			||||||
		if repoName == "oldRepo" && len(remainingRepos) == 3 {
 | 
							if r.String() == ref1.String() && len(remainingRepos) == 2 {
 | 
				
			||||||
			t.Errorf("oldRepo should be removed first")
 | 
								t.Errorf("ref1 should be removed first")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		_, ok := remainingRepos[repoName]
 | 
							_, ok := remainingRepos[r.String()]
 | 
				
			||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
			t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
 | 
								t.Fatalf("Trying to remove nonexistant repo: %s", r)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		delete(remainingRepos, repoName)
 | 
							delete(remainingRepos, r.String())
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeUnit := time.Millisecond
 | 
						timeUnit := time.Millisecond
 | 
				
			||||||
	serialized, err := json.Marshal(&map[string]schedulerEntry{
 | 
						serialized, err := json.Marshal(&map[string]schedulerEntry{
 | 
				
			||||||
		"testBlob1": {
 | 
							ref1.String(): {
 | 
				
			||||||
			Expiry:    time.Now().Add(1 * timeUnit),
 | 
								Expiry:    time.Now().Add(1 * timeUnit),
 | 
				
			||||||
			Key:       "testBlob1",
 | 
								Key:       ref1.String(),
 | 
				
			||||||
			EntryType: 0,
 | 
								EntryType: 0,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		"oldRepo": {
 | 
							ref2.String(): {
 | 
				
			||||||
			Expiry:    time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
 | 
								Expiry:    time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
 | 
				
			||||||
			Key:       "oldRepo",
 | 
								Key:       ref2.String(),
 | 
				
			||||||
			EntryType: 0,
 | 
								EntryType: 0,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
| 
						 | 
					@ -108,13 +130,16 @@ func TestRestoreOld(t *testing.T) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestStopRestore(t *testing.T) {
 | 
					func TestStopRestore(t *testing.T) {
 | 
				
			||||||
 | 
						ref1, ref2, _ := testRefs(t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeUnit := time.Millisecond
 | 
						timeUnit := time.Millisecond
 | 
				
			||||||
	remainingRepos := map[string]bool{
 | 
						remainingRepos := map[string]bool{
 | 
				
			||||||
		"testBlob1": true,
 | 
							ref1.String(): true,
 | 
				
			||||||
		"testBlob2": true,
 | 
							ref2.String(): true,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	deleteFunc := func(repoName string) error {
 | 
					
 | 
				
			||||||
		delete(remainingRepos, repoName)
 | 
						deleteFunc := func(r reference.Reference) error {
 | 
				
			||||||
 | 
							delete(remainingRepos, r.String())
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -127,8 +152,8 @@ func TestStopRestore(t *testing.T) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf(err.Error())
 | 
							t.Fatalf(err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	s.add("testBlob1", 300*timeUnit, entryTypeBlob)
 | 
						s.add(ref1, 300*timeUnit, entryTypeBlob)
 | 
				
			||||||
	s.add("testBlob2", 100*timeUnit, entryTypeBlob)
 | 
						s.add(ref2, 100*timeUnit, entryTypeBlob)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Start and stop before all operations complete
 | 
						// Start and stop before all operations complete
 | 
				
			||||||
	// state will be written to fs
 | 
						// state will be written to fs
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -17,6 +17,7 @@ func CheckBlobDescriptorCache(t *testing.T, provider cache.BlobDescriptorCachePr
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	checkBlobDescriptorCacheEmptyRepository(t, ctx, provider)
 | 
						checkBlobDescriptorCacheEmptyRepository(t, ctx, provider)
 | 
				
			||||||
	checkBlobDescriptorCacheSetAndRead(t, ctx, provider)
 | 
						checkBlobDescriptorCacheSetAndRead(t, ctx, provider)
 | 
				
			||||||
 | 
						checkBlobDescriptorCacheClear(t, ctx, provider)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func checkBlobDescriptorCacheEmptyRepository(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) {
 | 
					func checkBlobDescriptorCacheEmptyRepository(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) {
 | 
				
			||||||
| 
						 | 
					@ -141,10 +142,10 @@ func checkBlobDescriptorCacheSetAndRead(t *testing.T, ctx context.Context, provi
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func checkBlobDescriptorClear(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) {
 | 
					func checkBlobDescriptorCacheClear(t *testing.T, ctx context.Context, provider cache.BlobDescriptorCacheProvider) {
 | 
				
			||||||
	localDigest := digest.Digest("sha384:abc")
 | 
						localDigest := digest.Digest("sha384:def111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
 | 
				
			||||||
	expected := distribution.Descriptor{
 | 
						expected := distribution.Descriptor{
 | 
				
			||||||
		Digest:    "sha256:abc",
 | 
							Digest:    "sha256:def1111111111111111111111111111111111111111111111111111111111111",
 | 
				
			||||||
		Size:      10,
 | 
							Size:      10,
 | 
				
			||||||
		MediaType: "application/octet-stream"}
 | 
							MediaType: "application/octet-stream"}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -168,12 +169,11 @@ func checkBlobDescriptorClear(t *testing.T, ctx context.Context, provider cache.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = cache.Clear(ctx, localDigest)
 | 
						err = cache.Clear(ctx, localDigest)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error deleting descriptor")
 | 
							t.Error(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	nonExistantDigest := digest.Digest("sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
 | 
						desc, err = cache.Stat(ctx, localDigest)
 | 
				
			||||||
	err = cache.Clear(ctx, nonExistantDigest)
 | 
					 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
		t.Fatalf("expected error deleting unknown descriptor")
 | 
							t.Fatalf("expected error statting deleted blob: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue