Merge pull request #3635 from milosgajdos/make-s3-driver-delete-faster
Delete S3 keys incrementally in batchesmaster
						commit
						e3509fc1de
					
				|  | @ -15,6 +15,7 @@ import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"context" | 	"context" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
|  | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io" | 	"io" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
|  | @ -922,54 +923,71 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func min(a, b int) int { |  | ||||||
| 	if a < b { |  | ||||||
| 		return a |  | ||||||
| 	} |  | ||||||
| 	return b |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Delete recursively deletes all objects stored at "path" and its subpaths.
 | // Delete recursively deletes all objects stored at "path" and its subpaths.
 | ||||||
| // We must be careful since S3 does not guarantee read after delete consistency
 | // We must be careful since S3 does not guarantee read after delete consistency
 | ||||||
| func (d *driver) Delete(ctx context.Context, path string) error { | func (d *driver) Delete(ctx context.Context, path string) error { | ||||||
| 	s3Objects := make([]*s3.ObjectIdentifier, 0, listMax) | 	s3Objects := make([]*s3.ObjectIdentifier, 0, listMax) | ||||||
| 
 | 	s3Path := d.s3Path(path) | ||||||
| 	// manually add the given path if it's a file
 |  | ||||||
| 	stat, err := d.Stat(ctx, path) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	if stat != nil && !stat.IsDir() { |  | ||||||
| 		path := d.s3Path(path) |  | ||||||
| 		s3Objects = append(s3Objects, &s3.ObjectIdentifier{ |  | ||||||
| 			Key: &path, |  | ||||||
| 		}) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// list objects under the given path as a subpath (suffix with slash "/")
 |  | ||||||
| 	s3Path := d.s3Path(path) + "/" |  | ||||||
| 	listObjectsInput := &s3.ListObjectsV2Input{ | 	listObjectsInput := &s3.ListObjectsV2Input{ | ||||||
| 		Bucket: aws.String(d.Bucket), | 		Bucket: aws.String(d.Bucket), | ||||||
| 		Prefix: aws.String(s3Path), | 		Prefix: aws.String(s3Path), | ||||||
| 	} | 	} | ||||||
| ListLoop: | 
 | ||||||
| 	for { | 	for { | ||||||
| 		// list all the objects
 | 		// list all the objects
 | ||||||
| 		resp, err := d.S3.ListObjectsV2(listObjectsInput) | 		resp, err := d.S3.ListObjectsV2(listObjectsInput) | ||||||
| 
 | 
 | ||||||
| 		// resp.Contents can only be empty on the first call
 | 		// resp.Contents can only be empty on the first call
 | ||||||
| 		// if there were no more results to return after the first call, resp.IsTruncated would have been false
 | 		// if there were no more results to return after the first call, resp.IsTruncated would have been false
 | ||||||
| 		// and the loop would be exited without recalling ListObjects
 | 		// and the loop would exit without recalling ListObjects
 | ||||||
| 		if err != nil || len(resp.Contents) == 0 { | 		if err != nil || len(resp.Contents) == 0 { | ||||||
| 			break ListLoop | 			return storagedriver.PathNotFoundError{Path: path} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		for _, key := range resp.Contents { | 		for _, key := range resp.Contents { | ||||||
|  | 			// Skip if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
 | ||||||
|  | 			if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
| 			s3Objects = append(s3Objects, &s3.ObjectIdentifier{ | 			s3Objects = append(s3Objects, &s3.ObjectIdentifier{ | ||||||
| 				Key: key.Key, | 				Key: key.Key, | ||||||
| 			}) | 			}) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		// Delete objects only if the list is not empty, otherwise S3 API returns a cryptic error
 | ||||||
|  | 		if len(s3Objects) > 0 { | ||||||
|  | 			// NOTE: according to AWS docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
 | ||||||
|  | 			// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more.
 | ||||||
|  | 			// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack
 | ||||||
|  | 			// Delete here straight away and reset the object slice when successful.
 | ||||||
|  | 			resp, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ | ||||||
|  | 				Bucket: aws.String(d.Bucket), | ||||||
|  | 				Delete: &s3.Delete{ | ||||||
|  | 					Objects: s3Objects, | ||||||
|  | 					Quiet:   aws.Bool(false), | ||||||
|  | 				}, | ||||||
|  | 			}) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if len(resp.Errors) > 0 { | ||||||
|  | 				// NOTE: AWS SDK s3.Error does not implement error interface which
 | ||||||
|  | 				// is pretty intensely sad, so we have to do away with this for now.
 | ||||||
|  | 				errs := make([]error, 0, len(resp.Errors)) | ||||||
|  | 				for _, err := range resp.Errors { | ||||||
|  | 					errs = append(errs, errors.New(err.String())) | ||||||
|  | 				} | ||||||
|  | 				return storagedriver.Errors{ | ||||||
|  | 					DriverName: driverName, | ||||||
|  | 					Errs:       errs, | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		// NOTE: we don't want to reallocate
 | ||||||
|  | 		// the slice so we simply "reset" it
 | ||||||
|  | 		s3Objects = s3Objects[:0] | ||||||
|  | 
 | ||||||
| 		// resp.Contents must have at least one element or we would have returned not found
 | 		// resp.Contents must have at least one element or we would have returned not found
 | ||||||
| 		listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key | 		listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key | ||||||
| 
 | 
 | ||||||
|  | @ -980,24 +998,6 @@ ListLoop: | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	total := len(s3Objects) |  | ||||||
| 	if total == 0 { |  | ||||||
| 		return storagedriver.PathNotFoundError{Path: path} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// need to chunk objects into groups of 1000 per s3 restrictions
 |  | ||||||
| 	for i := 0; i < total; i += 1000 { |  | ||||||
| 		_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ |  | ||||||
| 			Bucket: aws.String(d.Bucket), |  | ||||||
| 			Delete: &s3.Delete{ |  | ||||||
| 				Objects: s3Objects[i:min(i+1000, total)], |  | ||||||
| 				Quiet:   aws.Bool(false), |  | ||||||
| 			}, |  | ||||||
| 		}) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -772,6 +772,10 @@ func TestMoveWithMultipartCopy(t *testing.T) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestListObjectsV2(t *testing.T) { | func TestListObjectsV2(t *testing.T) { | ||||||
|  | 	if skipS3() != "" { | ||||||
|  | 		t.Skip(skipS3()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	rootDir, err := ioutil.TempDir("", "driver-") | 	rootDir, err := ioutil.TempDir("", "driver-") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("unexpected error creating temporary directory: %v", err) | 		t.Fatalf("unexpected error creating temporary directory: %v", err) | ||||||
|  |  | ||||||
|  | @ -169,3 +169,27 @@ type Error struct { | ||||||
| func (err Error) Error() string { | func (err Error) Error() string { | ||||||
| 	return fmt.Sprintf("%s: %s", err.DriverName, err.Enclosed) | 	return fmt.Sprintf("%s: %s", err.DriverName, err.Enclosed) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // Errors provides the envelope for multiple errors
 | ||||||
|  | // for use within the storagedriver implementations.
 | ||||||
|  | type Errors struct { | ||||||
|  | 	DriverName string | ||||||
|  | 	Errs       []error | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var _ error = Errors{} | ||||||
|  | 
 | ||||||
|  | func (e Errors) Error() string { | ||||||
|  | 	switch len(e.Errs) { | ||||||
|  | 	case 0: | ||||||
|  | 		return "<nil>" | ||||||
|  | 	case 1: | ||||||
|  | 		return e.Errs[0].Error() | ||||||
|  | 	default: | ||||||
|  | 		msg := "errors:\n" | ||||||
|  | 		for _, err := range e.Errs { | ||||||
|  | 			msg += err.Error() + "\n" | ||||||
|  | 		} | ||||||
|  | 		return msg | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue