Buffer writing the scheduler entry state to disk by periodically checking for
changes to the entries index and saving it to the filesystem. Signed-off-by: Richard Scothern <richard.scothern@gmail.com>master
							parent
							
								
									9e8aaf7b40
								
							
						
					
					
						commit
						33428c37e1
					
				| 
						 | 
					@ -16,6 +16,7 @@ type expiryFunc func(string) error
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	entryTypeBlob = iota
 | 
						entryTypeBlob = iota
 | 
				
			||||||
	entryTypeManifest
 | 
						entryTypeManifest
 | 
				
			||||||
 | 
						indexSaveFrequency = 5 * time.Second
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// schedulerEntry represents an entry in the scheduler
 | 
					// schedulerEntry represents an entry in the scheduler
 | 
				
			||||||
| 
						 | 
					@ -36,6 +37,8 @@ func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpi
 | 
				
			||||||
		pathToStateFile: path,
 | 
							pathToStateFile: path,
 | 
				
			||||||
		ctx:             ctx,
 | 
							ctx:             ctx,
 | 
				
			||||||
		stopped:         true,
 | 
							stopped:         true,
 | 
				
			||||||
 | 
							doneChan:        make(chan struct{}),
 | 
				
			||||||
 | 
							saveTimer:       time.NewTicker(indexSaveFrequency),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -54,6 +57,10 @@ type TTLExpirationScheduler struct {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	onBlobExpire     expiryFunc
 | 
						onBlobExpire     expiryFunc
 | 
				
			||||||
	onManifestExpire expiryFunc
 | 
						onManifestExpire expiryFunc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						indexDirty bool
 | 
				
			||||||
 | 
						saveTimer  *time.Ticker
 | 
				
			||||||
 | 
						doneChan   chan struct{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// OnBlobExpire is called when a scheduled blob's TTL expires
 | 
					// OnBlobExpire is called when a scheduled blob's TTL expires
 | 
				
			||||||
| 
						 | 
					@ -119,6 +126,31 @@ func (ttles *TTLExpirationScheduler) Start() error {
 | 
				
			||||||
		entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now()))
 | 
							entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now()))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start a ticker to periodically save the entries index
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							for {
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case <-ttles.saveTimer.C:
 | 
				
			||||||
 | 
									if !ttles.indexDirty {
 | 
				
			||||||
 | 
										continue
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									ttles.Lock()
 | 
				
			||||||
 | 
									err := ttles.writeState()
 | 
				
			||||||
 | 
									if err != nil {
 | 
				
			||||||
 | 
										context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										ttles.indexDirty = false
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									ttles.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								case <-ttles.doneChan:
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -134,10 +166,7 @@ func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType in
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ttles.entries[key] = entry
 | 
						ttles.entries[key] = entry
 | 
				
			||||||
	entry.timer = ttles.startTimer(entry, ttl)
 | 
						entry.timer = ttles.startTimer(entry, ttl)
 | 
				
			||||||
 | 
						ttles.indexDirty = true
 | 
				
			||||||
	if err := ttles.writeState(); err != nil {
 | 
					 | 
				
			||||||
		context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer {
 | 
					func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer {
 | 
				
			||||||
| 
						 | 
					@ -163,9 +192,7 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		delete(ttles.entries, entry.Key)
 | 
							delete(ttles.entries, entry.Key)
 | 
				
			||||||
		if err := ttles.writeState(); err != nil {
 | 
							ttles.indexDirty = true
 | 
				
			||||||
			context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -181,6 +208,9 @@ func (ttles *TTLExpirationScheduler) Stop() {
 | 
				
			||||||
	for _, entry := range ttles.entries {
 | 
						for _, entry := range ttles.entries {
 | 
				
			||||||
		entry.timer.Stop()
 | 
							entry.timer.Stop()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						close(ttles.doneChan)
 | 
				
			||||||
 | 
						ttles.saveTimer.Stop()
 | 
				
			||||||
	ttles.stopped = true
 | 
						ttles.stopped = true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -194,6 +224,7 @@ func (ttles *TTLExpirationScheduler) writeState() error {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue