Merge pull request #1122 from RichardScothern/scheduler-coalesce
Buffer writes to scheduler state file.master
						commit
						ef17db8d73
					
				|  | @ -16,6 +16,7 @@ type expiryFunc func(string) error | |||
| const ( | ||||
| 	entryTypeBlob = iota | ||||
| 	entryTypeManifest | ||||
| 	indexSaveFrequency = 5 * time.Second | ||||
| ) | ||||
| 
 | ||||
| // schedulerEntry represents an entry in the scheduler
 | ||||
|  | @ -36,6 +37,8 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi | |||
| 		pathToStateFile: path, | ||||
| 		ctx:             ctx, | ||||
| 		stopped:         true, | ||||
| 		doneChan:        make(chan struct{}), | ||||
| 		saveTimer:       time.NewTicker(indexSaveFrequency), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | @ -54,6 +57,10 @@ type TTLExpirationScheduler struct { | |||
| 
 | ||||
| 	onBlobExpire     expiryFunc | ||||
| 	onManifestExpire expiryFunc | ||||
| 
 | ||||
| 	indexDirty bool | ||||
| 	saveTimer  *time.Ticker | ||||
| 	doneChan   chan struct{} | ||||
| } | ||||
| 
 | ||||
| // OnBlobExpire is called when a scheduled blob's TTL expires
 | ||||
|  | @ -119,6 +126,31 @@ func (ttles *TTLExpirationScheduler) Start() error { | |||
| 		entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now())) | ||||
| 	} | ||||
| 
 | ||||
| 	// Start a ticker to periodically save the entries index
 | ||||
| 
 | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-ttles.saveTimer.C: | ||||
| 				if !ttles.indexDirty { | ||||
| 					continue | ||||
| 				} | ||||
| 
 | ||||
| 				ttles.Lock() | ||||
| 				err := ttles.writeState() | ||||
| 				if err != nil { | ||||
| 					context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) | ||||
| 				} else { | ||||
| 					ttles.indexDirty = false | ||||
| 				} | ||||
| 				ttles.Unlock() | ||||
| 
 | ||||
| 			case <-ttles.doneChan: | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  | @ -134,10 +166,7 @@ func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType in | |||
| 	} | ||||
| 	ttles.entries[key] = entry | ||||
| 	entry.timer = ttles.startTimer(entry, ttl) | ||||
| 
 | ||||
| 	if err := ttles.writeState(); err != nil { | ||||
| 		context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) | ||||
| 	} | ||||
| 	ttles.indexDirty = true | ||||
| } | ||||
| 
 | ||||
| func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer { | ||||
|  | @ -163,9 +192,7 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time. | |||
| 		} | ||||
| 
 | ||||
| 		delete(ttles.entries, entry.Key) | ||||
| 		if err := ttles.writeState(); err != nil { | ||||
| 			context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) | ||||
| 		} | ||||
| 		ttles.indexDirty = true | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
|  | @ -181,6 +208,9 @@ func (ttles *TTLExpirationScheduler) Stop() { | |||
| 	for _, entry := range ttles.entries { | ||||
| 		entry.timer.Stop() | ||||
| 	} | ||||
| 
 | ||||
| 	close(ttles.doneChan) | ||||
| 	ttles.saveTimer.Stop() | ||||
| 	ttles.stopped = true | ||||
| } | ||||
| 
 | ||||
|  | @ -194,6 +224,7 @@ func (ttles *TTLExpirationScheduler) writeState() error { | |||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue