Attempt to address intermittent s3 RequestTimeout error
Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									ea7d59ce2a
								
							
						
					
					
						commit
						0f897aea8f
					
				| 
						 | 
				
			
			@ -28,6 +28,7 @@ import (
 | 
			
		|||
 | 
			
		||||
	"github.com/AdRoll/goamz/aws"
 | 
			
		||||
	"github.com/AdRoll/goamz/s3"
 | 
			
		||||
	"github.com/Sirupsen/logrus"
 | 
			
		||||
	storagedriver "github.com/docker/distribution/registry/storage/driver"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/base"
 | 
			
		||||
	"github.com/docker/distribution/registry/storage/driver/factory"
 | 
			
		||||
| 
						 | 
				
			
			@ -394,18 +395,64 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
			
		|||
		go func(bytesRead int, from int64, buf []byte) {
 | 
			
		||||
			defer d.putbuf(buf) // this buffer gets dropped after this call
 | 
			
		||||
 | 
			
		||||
			// parts and partNumber are safe, because this function is the only one modifying them and we
 | 
			
		||||
			// force it to be executed serially.
 | 
			
		||||
			if bytesRead > 0 {
 | 
			
		||||
				part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
 | 
			
		||||
				if putErr != nil {
 | 
			
		||||
					putErrChan <- putErr
 | 
			
		||||
			// DRAGONS(stevvooe): There are few things one might want to know
 | 
			
		||||
			// about this section. First, the putErrChan is expecting an error
 | 
			
		||||
			// and a nil or just a nil to come through the channel. This is
 | 
			
		||||
			// covered by the silly defer below. The other aspect is the s3
 | 
			
		||||
			// retry backoff to deal with RequestTimeout errors. Even though
 | 
			
		||||
			// the underlying s3 library should handle it, it doesn't seem to
 | 
			
		||||
			// be part of the shouldRetry function (see AdRoll/goamz/s3).
 | 
			
		||||
			defer func() {
 | 
			
		||||
				putErrChan <- nil // for some reason, we do this no matter what.
 | 
			
		||||
			}()
 | 
			
		||||
 | 
			
		||||
			if bytesRead <= 0 {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			var err error
 | 
			
		||||
			var part s3.Part
 | 
			
		||||
 | 
			
		||||
		loop:
 | 
			
		||||
			for retries := 0; retries < 5; retries++ {
 | 
			
		||||
				part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					break // success!
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				parts = append(parts, part)
 | 
			
		||||
				partNumber++
 | 
			
		||||
				// NOTE(stevvooe): This retry code tries to only retry under
 | 
			
		||||
				// conditions where the s3 package does not. We may add s3
 | 
			
		||||
				// error codes to the below if we see others bubble up in the
 | 
			
		||||
				// application. Right now, the most troubling is
 | 
			
		||||
				// RequestTimeout, which seems to only triggered when a tcp
 | 
			
		||||
				// connection to s3 slows to a crawl. If the RequestTimeout
 | 
			
		||||
				// ends up getting added to the s3 library and we don't see
 | 
			
		||||
				// other errors, this retry loop can be removed.
 | 
			
		||||
				switch err := err.(type) {
 | 
			
		||||
				case *s3.Error:
 | 
			
		||||
					switch err.Code {
 | 
			
		||||
					case "RequestTimeout":
 | 
			
		||||
						// allow retries on only this error.
 | 
			
		||||
					default:
 | 
			
		||||
						break loop
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				backoff := 100 * time.Millisecond * time.Duration(retries+1)
 | 
			
		||||
				logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String())
 | 
			
		||||
				time.Sleep(backoff)
 | 
			
		||||
			}
 | 
			
		||||
			putErrChan <- nil
 | 
			
		||||
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logrus.Errorf("error putting part, aborting: %v", err)
 | 
			
		||||
				putErrChan <- err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// parts and partNumber are safe, because this function is the
 | 
			
		||||
			// only one modifying them and we force it to be executed
 | 
			
		||||
			// serially.
 | 
			
		||||
			parts = append(parts, part)
 | 
			
		||||
			partNumber++
 | 
			
		||||
		}(bytesRead, from, buf)
 | 
			
		||||
 | 
			
		||||
		buf = d.getbuf() // use a new buffer for the next call
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue