commit
						cbf1f0797c
					
				| 
						 | 
				
			
			@ -3,13 +3,14 @@ package scheduler
 | 
			
		|||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/context"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// onTTLExpiryFunc is called when a repositories' TTL expires
 | 
			
		||||
// onTTLExpiryFunc is called when a repository's TTL expires
 | 
			
		||||
type expiryFunc func(string) error
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
| 
						 | 
				
			
			@ -23,14 +24,14 @@ type schedulerEntry struct {
 | 
			
		|||
	Key       string    `json:"Key"`
 | 
			
		||||
	Expiry    time.Time `json:"ExpiryData"`
 | 
			
		||||
	EntryType int       `json:"EntryType"`
 | 
			
		||||
 | 
			
		||||
	timer *time.Timer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New returns a new instance of the scheduler
 | 
			
		||||
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
 | 
			
		||||
	return &TTLExpirationScheduler{
 | 
			
		||||
		entries:         make(map[string]schedulerEntry),
 | 
			
		||||
		addChan:         make(chan schedulerEntry),
 | 
			
		||||
		stopChan:        make(chan bool),
 | 
			
		||||
		entries:         make(map[string]*schedulerEntry),
 | 
			
		||||
		driver:          driver,
 | 
			
		||||
		pathToStateFile: path,
 | 
			
		||||
		ctx:             ctx,
 | 
			
		||||
| 
						 | 
				
			
			@ -41,9 +42,9 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi
 | 
			
		|||
// TTLExpirationScheduler is a scheduler used to perform actions
 | 
			
		||||
// when TTLs expire
 | 
			
		||||
type TTLExpirationScheduler struct {
 | 
			
		||||
	entries  map[string]schedulerEntry
 | 
			
		||||
	addChan  chan schedulerEntry
 | 
			
		||||
	stopChan chan bool
 | 
			
		||||
	sync.Mutex
 | 
			
		||||
 | 
			
		||||
	entries map[string]*schedulerEntry
 | 
			
		||||
 | 
			
		||||
	driver          driver.StorageDriver
 | 
			
		||||
	ctx             context.Context
 | 
			
		||||
| 
						 | 
				
			
			@ -55,24 +56,27 @@ type TTLExpirationScheduler struct {
 | 
			
		|||
	onManifestExpire expiryFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// addChan allows more TTLs to be pushed to the scheduler
 | 
			
		||||
type addChan chan schedulerEntry
 | 
			
		||||
 | 
			
		||||
// stopChan allows the scheduler to be stopped - used for testing.
 | 
			
		||||
type stopChan chan bool
 | 
			
		||||
 | 
			
		||||
// OnBlobExpire is called when a scheduled blob's TTL expires
 | 
			
		||||
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
 | 
			
		||||
	ttles.Lock()
 | 
			
		||||
	defer ttles.Unlock()
 | 
			
		||||
 | 
			
		||||
	ttles.onBlobExpire = f
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OnManifestExpire is called when a scheduled manifest's TTL expires
 | 
			
		||||
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
 | 
			
		||||
	ttles.Lock()
 | 
			
		||||
	defer ttles.Unlock()
 | 
			
		||||
 | 
			
		||||
	ttles.onManifestExpire = f
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddBlob schedules a blob cleanup after ttl expires
 | 
			
		||||
func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error {
 | 
			
		||||
	ttles.Lock()
 | 
			
		||||
	defer ttles.Unlock()
 | 
			
		||||
 | 
			
		||||
	if ttles.stopped {
 | 
			
		||||
		return fmt.Errorf("scheduler not started")
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -82,6 +86,9 @@ func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) err
 | 
			
		|||
 | 
			
		||||
// AddManifest schedules a manifest cleanup after ttl expires
 | 
			
		||||
func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error {
 | 
			
		||||
	ttles.Lock()
 | 
			
		||||
	defer ttles.Unlock()
 | 
			
		||||
 | 
			
		||||
	if ttles.stopped {
 | 
			
		||||
		return fmt.Errorf("scheduler not started")
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -92,23 +99,9 @@ func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Durat
 | 
			
		|||
 | 
			
		||||
// Start starts the scheduler
 | 
			
		||||
func (ttles *TTLExpirationScheduler) Start() error {
 | 
			
		||||
	return ttles.start()
 | 
			
		||||
}
 | 
			
		||||
	ttles.Lock()
 | 
			
		||||
	defer ttles.Unlock()
 | 
			
		||||
 | 
			
		||||
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
 | 
			
		||||
	entry := schedulerEntry{
 | 
			
		||||
		Key:       key,
 | 
			
		||||
		Expiry:    time.Now().Add(ttl),
 | 
			
		||||
		EntryType: eType,
 | 
			
		||||
	}
 | 
			
		||||
	ttles.addChan <- entry
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ttles *TTLExpirationScheduler) stop() {
 | 
			
		||||
	ttles.stopChan <- true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ttles *TTLExpirationScheduler) start() error {
 | 
			
		||||
	err := ttles.readState()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
| 
						 | 
				
			
			@ -120,97 +113,75 @@ func (ttles *TTLExpirationScheduler) start() error {
 | 
			
		|||
 | 
			
		||||
	context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
 | 
			
		||||
	ttles.stopped = false
 | 
			
		||||
	go ttles.mainloop()
 | 
			
		||||
 | 
			
		||||
	// Start timer for each deserialized entry
 | 
			
		||||
	for _, entry := range ttles.entries {
 | 
			
		||||
		entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now()))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// mainloop uses a select statement to listen for events.  Most of its time
 | 
			
		||||
// is spent in waiting on a TTL to expire but can be interrupted when TTLs
 | 
			
		||||
// are added.
 | 
			
		||||
func (ttles *TTLExpirationScheduler) mainloop() {
 | 
			
		||||
	for {
 | 
			
		||||
		if ttles.stopped {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) {
 | 
			
		||||
	entry := &schedulerEntry{
 | 
			
		||||
		Key:       key,
 | 
			
		||||
		Expiry:    time.Now().Add(ttl),
 | 
			
		||||
		EntryType: eType,
 | 
			
		||||
	}
 | 
			
		||||
	context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
 | 
			
		||||
	if oldEntry, present := ttles.entries[key]; present && oldEntry.timer != nil {
 | 
			
		||||
		oldEntry.timer.Stop()
 | 
			
		||||
	}
 | 
			
		||||
	ttles.entries[key] = entry
 | 
			
		||||
	entry.timer = ttles.startTimer(entry, ttl)
 | 
			
		||||
 | 
			
		||||
		nextEntry, ttl := nextExpiringEntry(ttles.entries)
 | 
			
		||||
		if len(ttles.entries) == 0 {
 | 
			
		||||
			context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...")
 | 
			
		||||
		} else {
 | 
			
		||||
			context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		select {
 | 
			
		||||
		case <-time.After(ttl):
 | 
			
		||||
			var f expiryFunc
 | 
			
		||||
 | 
			
		||||
			switch nextEntry.EntryType {
 | 
			
		||||
			case entryTypeBlob:
 | 
			
		||||
				f = ttles.onBlobExpire
 | 
			
		||||
			case entryTypeManifest:
 | 
			
		||||
				f = ttles.onManifestExpire
 | 
			
		||||
			default:
 | 
			
		||||
				f = func(repoName string) error {
 | 
			
		||||
					return fmt.Errorf("Unexpected scheduler entry type")
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if err := f(nextEntry.Key); err != nil {
 | 
			
		||||
				context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			delete(ttles.entries, nextEntry.Key)
 | 
			
		||||
			if err := ttles.writeState(); err != nil {
 | 
			
		||||
				context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
			
		||||
			}
 | 
			
		||||
		case entry := <-ttles.addChan:
 | 
			
		||||
			context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
 | 
			
		||||
			ttles.entries[entry.Key] = entry
 | 
			
		||||
			if err := ttles.writeState(); err != nil {
 | 
			
		||||
				context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
			
		||||
			}
 | 
			
		||||
			break
 | 
			
		||||
 | 
			
		||||
		case <-ttles.stopChan:
 | 
			
		||||
			if err := ttles.writeState(); err != nil {
 | 
			
		||||
				context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
			
		||||
			}
 | 
			
		||||
			ttles.stopped = true
 | 
			
		||||
		}
 | 
			
		||||
	if err := ttles.writeState(); err != nil {
 | 
			
		||||
		context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) {
 | 
			
		||||
	if len(entries) == 0 {
 | 
			
		||||
		return nil, 24 * time.Hour
 | 
			
		||||
	}
 | 
			
		||||
func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer {
 | 
			
		||||
	return time.AfterFunc(ttl, func() {
 | 
			
		||||
		ttles.Lock()
 | 
			
		||||
		defer ttles.Unlock()
 | 
			
		||||
 | 
			
		||||
	// todo:(richardscothern) this is a primitive o(n) algorithm
 | 
			
		||||
	// but n will never be *that* big and it's all in memory.  Investigate
 | 
			
		||||
	// time.AfterFunc for heap based expiries
 | 
			
		||||
		var f expiryFunc
 | 
			
		||||
 | 
			
		||||
	first := true
 | 
			
		||||
	var nextEntry schedulerEntry
 | 
			
		||||
	for _, entry := range entries {
 | 
			
		||||
		if first {
 | 
			
		||||
			nextEntry = entry
 | 
			
		||||
			first = false
 | 
			
		||||
			continue
 | 
			
		||||
		switch entry.EntryType {
 | 
			
		||||
		case entryTypeBlob:
 | 
			
		||||
			f = ttles.onBlobExpire
 | 
			
		||||
		case entryTypeManifest:
 | 
			
		||||
			f = ttles.onManifestExpire
 | 
			
		||||
		default:
 | 
			
		||||
			f = func(repoName string) error {
 | 
			
		||||
				return fmt.Errorf("Unexpected scheduler entry type")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if entry.Expiry.Before(nextEntry.Expiry) {
 | 
			
		||||
			nextEntry = entry
 | 
			
		||||
 | 
			
		||||
		if err := f(entry.Key); err != nil {
 | 
			
		||||
			context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		delete(ttles.entries, entry.Key)
 | 
			
		||||
		if err := ttles.writeState(); err != nil {
 | 
			
		||||
			context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Stop stops the scheduler.
 | 
			
		||||
func (ttles *TTLExpirationScheduler) Stop() {
 | 
			
		||||
	ttles.Lock()
 | 
			
		||||
	defer ttles.Unlock()
 | 
			
		||||
 | 
			
		||||
	if err := ttles.writeState(); err != nil {
 | 
			
		||||
		context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Dates may be from the past if the scheduler has
 | 
			
		||||
	// been restarted, set their ttl to 0
 | 
			
		||||
	if nextEntry.Expiry.Before(time.Now()) {
 | 
			
		||||
		nextEntry.Expiry = time.Now()
 | 
			
		||||
		return &nextEntry, 0
 | 
			
		||||
	for _, entry := range ttles.entries {
 | 
			
		||||
		entry.timer.Stop()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &nextEntry, nextEntry.Expiry.Sub(time.Now())
 | 
			
		||||
	ttles.stopped = true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ttles *TTLExpirationScheduler) writeState() error {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,7 +2,6 @@ package scheduler
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -27,13 +26,13 @@ func TestSchedule(t *testing.T) {
 | 
			
		|||
		if !ok {
 | 
			
		||||
			t.Fatalf("Trying to remove nonexistant repo: %s", repoName)
 | 
			
		||||
		}
 | 
			
		||||
		fmt.Println("removing", repoName)
 | 
			
		||||
		t.Log("removing", repoName)
 | 
			
		||||
		delete(remainingRepos, repoName)
 | 
			
		||||
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	s.onBlobExpire = deleteFunc
 | 
			
		||||
	err := s.start()
 | 
			
		||||
	err := s.Start()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -97,7 +96,7 @@ func TestRestoreOld(t *testing.T) {
 | 
			
		|||
	}
 | 
			
		||||
	s := New(context.Background(), fs, "/ttl")
 | 
			
		||||
	s.onBlobExpire = deleteFunc
 | 
			
		||||
	err = s.start()
 | 
			
		||||
	err = s.Start()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -124,7 +123,7 @@ func TestStopRestore(t *testing.T) {
 | 
			
		|||
	s := New(context.Background(), fs, pathToStateFile)
 | 
			
		||||
	s.onBlobExpire = deleteFunc
 | 
			
		||||
 | 
			
		||||
	err := s.start()
 | 
			
		||||
	err := s.Start()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf(err.Error())
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -133,13 +132,13 @@ func TestStopRestore(t *testing.T) {
 | 
			
		|||
 | 
			
		||||
	// Start and stop before all operations complete
 | 
			
		||||
	// state will be written to fs
 | 
			
		||||
	s.stop()
 | 
			
		||||
	s.Stop()
 | 
			
		||||
	time.Sleep(10 * time.Millisecond)
 | 
			
		||||
 | 
			
		||||
	// v2 will restore state from fs
 | 
			
		||||
	s2 := New(context.Background(), fs, pathToStateFile)
 | 
			
		||||
	s2.onBlobExpire = deleteFunc
 | 
			
		||||
	err = s2.start()
 | 
			
		||||
	err = s2.Start()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Error starting v2: %s", err.Error())
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -153,12 +152,11 @@ func TestStopRestore(t *testing.T) {
 | 
			
		|||
 | 
			
		||||
func TestDoubleStart(t *testing.T) {
 | 
			
		||||
	s := New(context.Background(), inmemory.New(), "/ttl")
 | 
			
		||||
	err := s.start()
 | 
			
		||||
	err := s.Start()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unable to start scheduler")
 | 
			
		||||
	}
 | 
			
		||||
	fmt.Printf("%#v", s)
 | 
			
		||||
	err = s.start()
 | 
			
		||||
	err = s.Start()
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Fatalf("Scheduler started twice without error")
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue