Merge pull request #463 from stevvooe/free-leaked-goroutine
registry/storage/driver/s3: use done channel to avoid goroutine leakmaster
						commit
						b5bae799e3
					
				| 
						 | 
					@ -310,6 +310,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
				
			||||||
	var putErrChan chan error
 | 
						var putErrChan chan error
 | 
				
			||||||
	parts := []s3.Part{}
 | 
						parts := []s3.Part{}
 | 
				
			||||||
	var part s3.Part
 | 
						var part s3.Part
 | 
				
			||||||
 | 
						done := make(chan struct{}) // stopgap to free up waiting goroutines
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	multi, err := d.Bucket.InitMulti(d.s3Path(path), d.getContentType(), getPermissions(), d.getOptions())
 | 
						multi, err := d.Bucket.InitMulti(d.s3Path(path), d.getContentType(), getPermissions(), d.getOptions())
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -344,6 +345,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		d.putbuf(buf) // needs to be here to pick up new buf value
 | 
							d.putbuf(buf) // needs to be here to pick up new buf value
 | 
				
			||||||
 | 
							close(done)   // free up any waiting goroutines
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Fills from 0 to total from current
 | 
						// Fills from 0 to total from current
 | 
				
			||||||
| 
						 | 
					@ -407,7 +409,11 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
				
			||||||
			// the underlying s3 library should handle it, it doesn't seem to
 | 
								// the underlying s3 library should handle it, it doesn't seem to
 | 
				
			||||||
			// be part of the shouldRetry function (see AdRoll/goamz/s3).
 | 
								// be part of the shouldRetry function (see AdRoll/goamz/s3).
 | 
				
			||||||
			defer func() {
 | 
								defer func() {
 | 
				
			||||||
				putErrChan <- nil // for some reason, we do this no matter what.
 | 
									select {
 | 
				
			||||||
 | 
									case putErrChan <- nil: // for some reason, we do this no matter what.
 | 
				
			||||||
 | 
									case <-done:
 | 
				
			||||||
 | 
										return // ensure we don't leak the goroutine
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}()
 | 
								}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if bytesRead <= 0 {
 | 
								if bytesRead <= 0 {
 | 
				
			||||||
| 
						 | 
					@ -449,7 +455,11 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				logrus.Errorf("error putting part, aborting: %v", err)
 | 
									logrus.Errorf("error putting part, aborting: %v", err)
 | 
				
			||||||
				putErrChan <- err
 | 
									select {
 | 
				
			||||||
 | 
									case putErrChan <- err:
 | 
				
			||||||
 | 
									case <-done:
 | 
				
			||||||
 | 
										return // don't leak the goroutine
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// parts and partNumber are safe, because this function is the
 | 
								// parts and partNumber are safe, because this function is the
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue