Send manifest and blob delete events to the notifications subsystem.
Signed-off-by: Richard Scothern <richard.scothern@docker.com>master
							parent
							
								
									fd7ccc0bdf
								
							
						
					
					
						commit
						f37b2ee16e
					
				| 
						 | 
					@ -6,6 +6,7 @@ import (
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/docker/distribution"
 | 
						"github.com/docker/distribution"
 | 
				
			||||||
	"github.com/docker/distribution/context"
 | 
						"github.com/docker/distribution/context"
 | 
				
			||||||
 | 
						"github.com/docker/distribution/digest"
 | 
				
			||||||
	"github.com/docker/distribution/reference"
 | 
						"github.com/docker/distribution/reference"
 | 
				
			||||||
	"github.com/docker/distribution/uuid"
 | 
						"github.com/docker/distribution/uuid"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -60,8 +61,8 @@ func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest)
 | 
				
			||||||
	return b.createManifestEventAndWrite(EventActionPull, repo, sm)
 | 
						return b.createManifestEventAndWrite(EventActionPull, repo, sm)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (b *bridge) ManifestDeleted(repo reference.Named, sm distribution.Manifest) error {
 | 
					func (b *bridge) ManifestDeleted(repo reference.Named, dgst digest.Digest) error {
 | 
				
			||||||
	return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
 | 
						return b.createManifestDeleteEventAndWrite(EventActionDelete, repo, dgst)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
 | 
					func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
 | 
				
			||||||
| 
						 | 
					@ -81,8 +82,8 @@ func (b *bridge) BlobMounted(repo reference.Named, desc distribution.Descriptor,
 | 
				
			||||||
	return b.sink.Write(*event)
 | 
						return b.sink.Write(*event)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (b *bridge) BlobDeleted(repo reference.Named, desc distribution.Descriptor) error {
 | 
					func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error {
 | 
				
			||||||
	return b.createBlobEventAndWrite(EventActionDelete, repo, desc)
 | 
						return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named, sm distribution.Manifest) error {
 | 
					func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named, sm distribution.Manifest) error {
 | 
				
			||||||
| 
						 | 
					@ -94,6 +95,14 @@ func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named
 | 
				
			||||||
	return b.sink.Write(*manifestEvent)
 | 
						return b.sink.Write(*manifestEvent)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (b *bridge) createManifestDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
 | 
				
			||||||
 | 
						event := b.createEvent(action)
 | 
				
			||||||
 | 
						event.Target.Repository = repo.Name()
 | 
				
			||||||
 | 
						event.Target.Digest = dgst
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return b.sink.Write(*event)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) {
 | 
					func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) {
 | 
				
			||||||
	event := b.createEvent(action)
 | 
						event := b.createEvent(action)
 | 
				
			||||||
	event.Target.Repository = repo.Name()
 | 
						event.Target.Repository = repo.Name()
 | 
				
			||||||
| 
						 | 
					@ -127,6 +136,14 @@ func (b *bridge) createManifestEvent(action string, repo reference.Named, sm dis
 | 
				
			||||||
	return event, nil
 | 
						return event, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (b *bridge) createBlobDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
 | 
				
			||||||
 | 
						event := b.createEvent(action)
 | 
				
			||||||
 | 
						event.Target.Digest = dgst
 | 
				
			||||||
 | 
						event.Target.Repository = repo.Name()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return b.sink.Write(*event)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error {
 | 
					func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error {
 | 
				
			||||||
	event, err := b.createBlobEvent(action, repo, desc)
 | 
						event, err := b.createBlobEvent(action, repo, desc)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -63,13 +63,12 @@ func TestEventBridgeManifestPushed(t *testing.T) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEventBridgeManifestDeleted(t *testing.T) {
 | 
					func TestEventBridgeManifestDeleted(t *testing.T) {
 | 
				
			||||||
	l := createTestEnv(t, testSinkFn(func(events ...Event) error {
 | 
						l := createTestEnv(t, testSinkFn(func(events ...Event) error {
 | 
				
			||||||
		checkCommonManifest(t, EventActionDelete, events...)
 | 
							checkDeleted(t, EventActionDelete, events...)
 | 
				
			||||||
 | 
					 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}))
 | 
						}))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	repoRef, _ := reference.ParseNamed(repo)
 | 
						repoRef, _ := reference.ParseNamed(repo)
 | 
				
			||||||
	if err := l.ManifestDeleted(repoRef, sm); err != nil {
 | 
						if err := l.ManifestDeleted(repoRef, dgst); err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error notifying manifest pull: %v", err)
 | 
							t.Fatalf("unexpected error notifying manifest pull: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -91,6 +90,35 @@ func createTestEnv(t *testing.T, fn testSinkFn) Listener {
 | 
				
			||||||
	return NewBridge(ub, source, actor, request, fn)
 | 
						return NewBridge(ub, source, actor, request, fn)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func checkDeleted(t *testing.T, action string, events ...Event) {
 | 
				
			||||||
 | 
						if len(events) != 1 {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected number of events: %v != 1", len(events))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						event := events[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if event.Source != source {
 | 
				
			||||||
 | 
							t.Fatalf("source not equal: %#v != %#v", event.Source, source)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if event.Request != request {
 | 
				
			||||||
 | 
							t.Fatalf("request not equal: %#v != %#v", event.Request, request)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if event.Actor != actor {
 | 
				
			||||||
 | 
							t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if event.Target.Digest != dgst {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if event.Target.Repository != repo {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func checkCommonManifest(t *testing.T, action string, events ...Event) {
 | 
					func checkCommonManifest(t *testing.T, action string, events ...Event) {
 | 
				
			||||||
	checkCommon(t, events...)
 | 
						checkCommon(t, events...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -14,11 +14,7 @@ import (
 | 
				
			||||||
type ManifestListener interface {
 | 
					type ManifestListener interface {
 | 
				
			||||||
	ManifestPushed(repo reference.Named, sm distribution.Manifest) error
 | 
						ManifestPushed(repo reference.Named, sm distribution.Manifest) error
 | 
				
			||||||
	ManifestPulled(repo reference.Named, sm distribution.Manifest) error
 | 
						ManifestPulled(repo reference.Named, sm distribution.Manifest) error
 | 
				
			||||||
 | 
						ManifestDeleted(repo reference.Named, dgst digest.Digest) error
 | 
				
			||||||
	// TODO(stevvooe): Please note that delete support is still a little shaky
 | 
					 | 
				
			||||||
	// and we'll need to propagate these in the future.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	ManifestDeleted(repo reference.Named, sm distribution.Manifest) error
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// BlobListener describes a listener that can respond to layer related events.
 | 
					// BlobListener describes a listener that can respond to layer related events.
 | 
				
			||||||
| 
						 | 
					@ -26,11 +22,7 @@ type BlobListener interface {
 | 
				
			||||||
	BlobPushed(repo reference.Named, desc distribution.Descriptor) error
 | 
						BlobPushed(repo reference.Named, desc distribution.Descriptor) error
 | 
				
			||||||
	BlobPulled(repo reference.Named, desc distribution.Descriptor) error
 | 
						BlobPulled(repo reference.Named, desc distribution.Descriptor) error
 | 
				
			||||||
	BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
 | 
						BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
 | 
				
			||||||
 | 
						BlobDeleted(repo reference.Named, desc digest.Digest) error
 | 
				
			||||||
	// TODO(stevvooe): Please note that delete support is still a little shaky
 | 
					 | 
				
			||||||
	// and we'll need to propagate these in the future.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	BlobDeleted(repo reference.Named, desc distribution.Descriptor) error
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Listener combines all repository events into a single interface.
 | 
					// Listener combines all repository events into a single interface.
 | 
				
			||||||
| 
						 | 
					@ -75,6 +67,17 @@ type manifestServiceListener struct {
 | 
				
			||||||
	parent *repositoryListener
 | 
						parent *repositoryListener
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
 | 
				
			||||||
 | 
						err := msl.ManifestService.Delete(ctx, dgst)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil {
 | 
				
			||||||
 | 
								logrus.Errorf("error dispatching manifest delete to listener: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
 | 
					func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
 | 
				
			||||||
	sm, err := msl.ManifestService.Get(ctx, dgst)
 | 
						sm, err := msl.ManifestService.Get(ctx, dgst)
 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
| 
						 | 
					@ -173,6 +176,17 @@ func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribut
 | 
				
			||||||
	return bsl.decorateWriter(wr), err
 | 
						return bsl.decorateWriter(wr), err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
 | 
				
			||||||
 | 
						err := bsl.BlobStore.Delete(ctx, dgst)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil {
 | 
				
			||||||
 | 
								context.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
 | 
					func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
 | 
				
			||||||
	wr, err := bsl.BlobStore.Resume(ctx, id)
 | 
						wr, err := bsl.BlobStore.Resume(ctx, id)
 | 
				
			||||||
	return bsl.decorateWriter(wr), err
 | 
						return bsl.decorateWriter(wr), err
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -41,10 +41,10 @@ func TestListener(t *testing.T) {
 | 
				
			||||||
	expectedOps := map[string]int{
 | 
						expectedOps := map[string]int{
 | 
				
			||||||
		"manifest:push":   1,
 | 
							"manifest:push":   1,
 | 
				
			||||||
		"manifest:pull":   1,
 | 
							"manifest:pull":   1,
 | 
				
			||||||
		// "manifest:delete": 0, // deletes not supported for now
 | 
							"manifest:delete": 1,
 | 
				
			||||||
		"layer:push":      2,
 | 
							"layer:push":      2,
 | 
				
			||||||
		"layer:pull":      2,
 | 
							"layer:pull":      2,
 | 
				
			||||||
		// "layer:delete":    0, // deletes not supported for now
 | 
							"layer:delete":    2, // deletes not supported for now
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !reflect.DeepEqual(tl.ops, expectedOps) {
 | 
						if !reflect.DeepEqual(tl.ops, expectedOps) {
 | 
				
			||||||
| 
						 | 
					@ -68,7 +68,7 @@ func (tl *testListener) ManifestPulled(repo reference.Named, m distribution.Mani
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (tl *testListener) ManifestDeleted(repo reference.Named, m distribution.Manifest) error {
 | 
					func (tl *testListener) ManifestDeleted(repo reference.Named, d digest.Digest) error {
 | 
				
			||||||
	tl.ops["manifest:delete"]++
 | 
						tl.ops["manifest:delete"]++
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -88,7 +88,7 @@ func (tl *testListener) BlobMounted(repo reference.Named, desc distribution.Desc
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (tl *testListener) BlobDeleted(repo reference.Named, desc distribution.Descriptor) error {
 | 
					func (tl *testListener) BlobDeleted(repo reference.Named, d digest.Digest) error {
 | 
				
			||||||
	tl.ops["layer:delete"]++
 | 
						tl.ops["layer:delete"]++
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -113,6 +113,7 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
 | 
				
			||||||
		Tag:  tag,
 | 
							Tag:  tag,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var blobDigests []digest.Digest
 | 
				
			||||||
	blobs := repository.Blobs(ctx)
 | 
						blobs := repository.Blobs(ctx)
 | 
				
			||||||
	for i := 0; i < 2; i++ {
 | 
						for i := 0; i < 2; i++ {
 | 
				
			||||||
		rs, ds, err := testutil.CreateRandomTarFile()
 | 
							rs, ds, err := testutil.CreateRandomTarFile()
 | 
				
			||||||
| 
						 | 
					@ -120,6 +121,7 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
 | 
				
			||||||
			t.Fatalf("error creating test layer: %v", err)
 | 
								t.Fatalf("error creating test layer: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		dgst := digest.Digest(ds)
 | 
							dgst := digest.Digest(ds)
 | 
				
			||||||
 | 
							blobDigests = append(blobDigests, dgst)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		wr, err := blobs.Create(ctx)
 | 
							wr, err := blobs.Create(ctx)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -183,4 +185,16 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
 | 
				
			||||||
		t.Fatalf("unexpected error fetching manifest: %v", err)
 | 
							t.Fatalf("unexpected error fetching manifest: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err = manifests.Delete(ctx, dgst)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected error deleting blob: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, d := range blobDigests {
 | 
				
			||||||
 | 
							err = blobs.Delete(ctx, d)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatalf("unexpected error deleting blob: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue