Attempt to address intermittent s3 RequestTimeout error
Signed-off-by: Stephen J Day <stephen.day@docker.com>master
							parent
							
								
									f40bf864b0
								
							
						
					
					
						commit
						e6715c5cec
					
				|  | @ -28,6 +28,7 @@ import ( | |||
| 
 | ||||
| 	"github.com/AdRoll/goamz/aws" | ||||
| 	"github.com/AdRoll/goamz/s3" | ||||
| 	"github.com/Sirupsen/logrus" | ||||
| 	storagedriver "github.com/docker/distribution/registry/storage/driver" | ||||
| 	"github.com/docker/distribution/registry/storage/driver/base" | ||||
| 	"github.com/docker/distribution/registry/storage/driver/factory" | ||||
|  | @ -394,18 +395,64 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total | |||
| 		go func(bytesRead int, from int64, buf []byte) { | ||||
| 			defer d.putbuf(buf) // this buffer gets dropped after this call
 | ||||
| 
 | ||||
| 			// parts and partNumber are safe, because this function is the only one modifying them and we
 | ||||
| 			// force it to be executed serially.
 | ||||
| 			if bytesRead > 0 { | ||||
| 				part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) | ||||
| 				if putErr != nil { | ||||
| 					putErrChan <- putErr | ||||
| 			// DRAGONS(stevvooe): There are few things one might want to know
 | ||||
| 			// about this section. First, the putErrChan is expecting an error
 | ||||
| 			// and a nil or just a nil to come through the channel. This is
 | ||||
| 			// covered by the silly defer below. The other aspect is the s3
 | ||||
| 			// retry backoff to deal with RequestTimeout errors. Even though
 | ||||
| 			// the underlying s3 library should handle it, it doesn't seem to
 | ||||
| 			// be part of the shouldRetry function (see AdRoll/goamz/s3).
 | ||||
| 			defer func() { | ||||
| 				putErrChan <- nil // for some reason, we do this no matter what.
 | ||||
| 			}() | ||||
| 
 | ||||
| 			if bytesRead <= 0 { | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			var err error | ||||
| 			var part s3.Part | ||||
| 
 | ||||
| 		loop: | ||||
| 			for retries := 0; retries < 5; retries++ { | ||||
| 				part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) | ||||
| 				if err == nil { | ||||
| 					break // success!
 | ||||
| 				} | ||||
| 
 | ||||
| 				parts = append(parts, part) | ||||
| 				partNumber++ | ||||
| 				// NOTE(stevvooe): This retry code tries to only retry under
 | ||||
| 				// conditions where the s3 package does not. We may add s3
 | ||||
| 				// error codes to the below if we see others bubble up in the
 | ||||
| 				// application. Right now, the most troubling is
 | ||||
| 				// RequestTimeout, which seems to only triggered when a tcp
 | ||||
| 				// connection to s3 slows to a crawl. If the RequestTimeout
 | ||||
| 				// ends up getting added to the s3 library and we don't see
 | ||||
| 				// other errors, this retry loop can be removed.
 | ||||
| 				switch err := err.(type) { | ||||
| 				case *s3.Error: | ||||
| 					switch err.Code { | ||||
| 					case "RequestTimeout": | ||||
| 						// allow retries on only this error.
 | ||||
| 					default: | ||||
| 						break loop | ||||
| 					} | ||||
| 				} | ||||
| 
 | ||||
| 				backoff := 100 * time.Millisecond * time.Duration(retries+1) | ||||
| 				logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String()) | ||||
| 				time.Sleep(backoff) | ||||
| 			} | ||||
| 			putErrChan <- nil | ||||
| 
 | ||||
| 			if err != nil { | ||||
| 				logrus.Errorf("error putting part, aborting: %v", err) | ||||
| 				putErrChan <- err | ||||
| 			} | ||||
| 
 | ||||
| 			// parts and partNumber are safe, because this function is the
 | ||||
| 			// only one modifying them and we force it to be executed
 | ||||
| 			// serially.
 | ||||
| 			parts = append(parts, part) | ||||
| 			partNumber++ | ||||
| 		}(bytesRead, from, buf) | ||||
| 
 | ||||
| 		buf = d.getbuf() // use a new buffer for the next call
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue