Merge pull request #3480 from CollinShoop/optimize-s3-walk
Optimize storagedriver/s3 Walk (up to ~500x) + small bugfixmaster
						commit
						1563384b69
					
				|  | @ -360,7 +360,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Walk traverses a filesystem defined within driver, starting
 | // Walk traverses a filesystem defined within driver, starting
 | ||||||
| // from the given path, calling f on each file
 | // from the given path, calling f on each file and directory
 | ||||||
| func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | ||||||
| 	return storagedriver.WalkFallback(ctx, d, path, f) | 	return storagedriver.WalkFallback(ctx, d, path, f) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -290,7 +290,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Walk traverses a filesystem defined within driver, starting
 | // Walk traverses a filesystem defined within driver, starting
 | ||||||
| // from the given path, calling f on each file
 | // from the given path, calling f on each file and directory
 | ||||||
| func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | ||||||
| 	return storagedriver.WalkFallback(ctx, d, path, f) | 	return storagedriver.WalkFallback(ctx, d, path, f) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -245,7 +245,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Walk traverses a filesystem defined within driver, starting
 | // Walk traverses a filesystem defined within driver, starting
 | ||||||
| // from the given path, calling f on each file
 | // from the given path, calling f on each file and directory
 | ||||||
| func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | ||||||
| 	return storagedriver.WalkFallback(ctx, d, path, f) | 	return storagedriver.WalkFallback(ctx, d, path, f) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -20,6 +20,7 @@ import ( | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"math" | 	"math" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"path/filepath" | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"strconv" | 	"strconv" | ||||||
|  | @ -941,110 +942,84 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn) | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type walkInfoContainer struct { |  | ||||||
| 	storagedriver.FileInfoFields |  | ||||||
| 	prefix *string |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Path provides the full path of the target of this file info.
 |  | ||||||
| func (wi walkInfoContainer) Path() string { |  | ||||||
| 	return wi.FileInfoFields.Path |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Size returns current length in bytes of the file. The return value can
 |  | ||||||
| // be used to write to the end of the file at path. The value is
 |  | ||||||
| // meaningless if IsDir returns true.
 |  | ||||||
| func (wi walkInfoContainer) Size() int64 { |  | ||||||
| 	return wi.FileInfoFields.Size |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // ModTime returns the modification time for the file. For backends that
 |  | ||||||
| // don't have a modification time, the creation time should be returned.
 |  | ||||||
| func (wi walkInfoContainer) ModTime() time.Time { |  | ||||||
| 	return wi.FileInfoFields.ModTime |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // IsDir returns true if the path is a directory.
 |  | ||||||
| func (wi walkInfoContainer) IsDir() bool { |  | ||||||
| 	return wi.FileInfoFields.IsDir |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error { | func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error { | ||||||
| 	var retError error | 	var ( | ||||||
|  | 		retError error | ||||||
|  | 		// the most recent directory walked for de-duping
 | ||||||
|  | 		prevDir string | ||||||
|  | 		// the most recent skip directory to avoid walking over undesirable files
 | ||||||
|  | 		prevSkipDir string | ||||||
|  | 	) | ||||||
|  | 	prevDir = prefix + path | ||||||
| 
 | 
 | ||||||
| 	listObjectsInput := &s3.ListObjectsV2Input{ | 	listObjectsInput := &s3.ListObjectsV2Input{ | ||||||
| 		Bucket:    aws.String(d.Bucket), | 		Bucket:  aws.String(d.Bucket), | ||||||
| 		Prefix:    aws.String(path), | 		Prefix:  aws.String(path), | ||||||
| 		Delimiter: aws.String("/"), | 		MaxKeys: aws.Int64(listMax), | ||||||
| 		MaxKeys:   aws.Int64(listMax), |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	ctx, done := dcontext.WithTrace(parentCtx) | 	ctx, done := dcontext.WithTrace(parentCtx) | ||||||
| 	defer done("s3aws.ListObjectsV2Pages(%s)", path) | 	defer done("s3aws.ListObjectsV2Pages(%s)", path) | ||||||
|  | 
 | ||||||
|  | 	// When the "delimiter" argument is omitted, the S3 list API will list all objects in the bucket
 | ||||||
|  | 	// recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we
 | ||||||
|  | 	// can infer all the directories by comparing each object path to the last one we saw.
 | ||||||
|  | 	// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html
 | ||||||
|  | 
 | ||||||
|  | 	// With files returned in sorted depth-first order, directories are inferred in the same order.
 | ||||||
|  | 	// ErrSkipDir is handled by explicitly skipping over any files under the skipped directory. This may be sub-optimal
 | ||||||
|  | 	// for extreme edge cases but for the general use case in a registry, this is orders of magnitude
 | ||||||
|  | 	// faster than a more explicit recursive implementation.
 | ||||||
| 	listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool { | 	listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool { | ||||||
| 
 | 		walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents)) | ||||||
| 		var count int64 |  | ||||||
| 		// KeyCount was introduced with version 2 of the GET Bucket operation in S3.
 |  | ||||||
| 		// Some S3 implementations don't support V2 now, so we fall back to manual
 |  | ||||||
| 		// calculation of the key count if required
 |  | ||||||
| 		if objects.KeyCount != nil { |  | ||||||
| 			count = *objects.KeyCount |  | ||||||
| 			*objectCount += *objects.KeyCount |  | ||||||
| 		} else { |  | ||||||
| 			count = int64(len(objects.Contents) + len(objects.CommonPrefixes)) |  | ||||||
| 			*objectCount += count |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		walkInfos := make([]walkInfoContainer, 0, count) |  | ||||||
| 
 |  | ||||||
| 		for _, dir := range objects.CommonPrefixes { |  | ||||||
| 			commonPrefix := *dir.Prefix |  | ||||||
| 			walkInfos = append(walkInfos, walkInfoContainer{ |  | ||||||
| 				prefix: dir.Prefix, |  | ||||||
| 				FileInfoFields: storagedriver.FileInfoFields{ |  | ||||||
| 					IsDir: true, |  | ||||||
| 					Path:  strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1), |  | ||||||
| 				}, |  | ||||||
| 			}) |  | ||||||
| 		} |  | ||||||
| 
 | 
 | ||||||
| 		for _, file := range objects.Contents { | 		for _, file := range objects.Contents { | ||||||
| 			// empty prefixes are listed as objects inside its own prefix.
 | 			filePath := strings.Replace(*file.Key, d.s3Path(""), prefix, 1) | ||||||
| 			// https://docs.aws.amazon.com/AmazonS3/latest/user-guide/using-folders.html
 | 
 | ||||||
| 			if strings.HasSuffix(*file.Key, "/") { | 			// get a list of all inferred directories between the previous directory and this file
 | ||||||
| 				continue | 			dirs := directoryDiff(prevDir, filePath) | ||||||
|  | 			if len(dirs) > 0 { | ||||||
|  | 				for _, dir := range dirs { | ||||||
|  | 					walkInfos = append(walkInfos, storagedriver.FileInfoInternal{ | ||||||
|  | 						FileInfoFields: storagedriver.FileInfoFields{ | ||||||
|  | 							IsDir: true, | ||||||
|  | 							Path:  dir, | ||||||
|  | 						}, | ||||||
|  | 					}) | ||||||
|  | 					prevDir = dir | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
| 			walkInfos = append(walkInfos, walkInfoContainer{ | 
 | ||||||
|  | 			walkInfos = append(walkInfos, storagedriver.FileInfoInternal{ | ||||||
| 				FileInfoFields: storagedriver.FileInfoFields{ | 				FileInfoFields: storagedriver.FileInfoFields{ | ||||||
| 					IsDir:   false, | 					IsDir:   false, | ||||||
| 					Size:    *file.Size, | 					Size:    *file.Size, | ||||||
| 					ModTime: *file.LastModified, | 					ModTime: *file.LastModified, | ||||||
| 					Path:    strings.Replace(*file.Key, d.s3Path(""), prefix, 1), | 					Path:    filePath, | ||||||
| 				}, | 				}, | ||||||
| 			}) | 			}) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path }) |  | ||||||
| 
 |  | ||||||
| 		for _, walkInfo := range walkInfos { | 		for _, walkInfo := range walkInfos { | ||||||
| 			err := f(walkInfo) | 			// skip any results under the last skip directory
 | ||||||
| 
 | 			if prevSkipDir != "" && strings.HasPrefix(walkInfo.Path(), prevSkipDir) { | ||||||
| 			if err == storagedriver.ErrSkipDir { | 				continue | ||||||
| 				if walkInfo.IsDir() { |  | ||||||
| 					continue |  | ||||||
| 				} else { |  | ||||||
| 					break |  | ||||||
| 				} |  | ||||||
| 			} else if err != nil { |  | ||||||
| 				retError = err |  | ||||||
| 				return false |  | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			if walkInfo.IsDir() { | 			err := f(walkInfo) | ||||||
| 				if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil { | 			*objectCount++ | ||||||
| 					retError = err | 
 | ||||||
|  | 			if err != nil { | ||||||
|  | 				if err == storagedriver.ErrSkipDir { | ||||||
|  | 					if walkInfo.IsDir() { | ||||||
|  | 						prevSkipDir = walkInfo.Path() | ||||||
|  | 						continue | ||||||
|  | 					} | ||||||
|  | 					// is file, stop gracefully
 | ||||||
| 					return false | 					return false | ||||||
| 				} | 				} | ||||||
|  | 				retError = err | ||||||
|  | 				return false | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		return true | 		return true | ||||||
|  | @ -1061,6 +1036,44 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // directoryDiff finds all directories that are not in common between
 | ||||||
|  | // the previous and current paths in sorted order.
 | ||||||
|  | //
 | ||||||
|  | // Eg 1 directoryDiff("/path/to/folder", "/path/to/folder/folder/file")
 | ||||||
|  | //   => [ "/path/to/folder/folder" ],
 | ||||||
|  | // Eg 2 directoryDiff("/path/to/folder/folder1", "/path/to/folder/folder2/file")
 | ||||||
|  | //   => [ "/path/to/folder/folder2" ]
 | ||||||
|  | // Eg 3 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/file")
 | ||||||
|  | //  => [ "/path/to/folder/folder2" ]
 | ||||||
|  | // Eg 4 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/folder1/file")
 | ||||||
|  | //   => [ "/path/to/folder/folder2", "/path/to/folder/folder2/folder1" ]
 | ||||||
|  | // Eg 5 directoryDiff("/", "/path/to/folder/folder/file")
 | ||||||
|  | //   => [ "/path", "/path/to", "/path/to/folder", "/path/to/folder/folder" ],
 | ||||||
|  | func directoryDiff(prev, current string) []string { | ||||||
|  | 	var paths []string | ||||||
|  | 
 | ||||||
|  | 	if prev == "" || current == "" { | ||||||
|  | 		return paths | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	parent := current | ||||||
|  | 	for { | ||||||
|  | 		parent = filepath.Dir(parent) | ||||||
|  | 		if parent == "/" || parent == prev || strings.HasPrefix(prev, parent) { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 		paths = append(paths, parent) | ||||||
|  | 	} | ||||||
|  | 	reverse(paths) | ||||||
|  | 	return paths | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func reverse(s []string) { | ||||||
|  | 	for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { | ||||||
|  | 		s[i], s[j] = s[j], s[i] | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (d *driver) s3Path(path string) string { | func (d *driver) s3Path(path string) string { | ||||||
| 	return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") | 	return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/") | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -2,6 +2,7 @@ package s3 | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
|  | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"math/rand" | 	"math/rand" | ||||||
|  | @ -496,6 +497,165 @@ func TestDelete(t *testing.T) { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func TestWalk(t *testing.T) { | ||||||
|  | 	if skipS3() != "" { | ||||||
|  | 		t.Skip(skipS3()) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	rootDir, err := ioutil.TempDir("", "driver-") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("unexpected error creating temporary directory: %v", err) | ||||||
|  | 	} | ||||||
|  | 	defer os.Remove(rootDir) | ||||||
|  | 
 | ||||||
|  | 	driver, err := s3DriverConstructor(rootDir, s3.StorageClassStandard) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("unexpected error creating driver with standard storage: %v", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var fileset = []string{ | ||||||
|  | 		"/file1", | ||||||
|  | 		"/folder1/file1", | ||||||
|  | 		"/folder2/file1", | ||||||
|  | 		"/folder3/subfolder1/subfolder1/file1", | ||||||
|  | 		"/folder3/subfolder2/subfolder1/file1", | ||||||
|  | 		"/folder4/file1", | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// create file structure matching fileset above
 | ||||||
|  | 	var created []string | ||||||
|  | 	for _, path := range fileset { | ||||||
|  | 		err := driver.PutContent(context.Background(), path, []byte("content "+path)) | ||||||
|  | 		if err != nil { | ||||||
|  | 			fmt.Printf("unable to create file %s: %s\n", path, err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		created = append(created, path) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// cleanup
 | ||||||
|  | 	defer func() { | ||||||
|  | 		var lastErr error | ||||||
|  | 		for _, path := range created { | ||||||
|  | 			err := driver.Delete(context.Background(), path) | ||||||
|  | 			if err != nil { | ||||||
|  | 				_ = fmt.Errorf("cleanup failed for path %s: %s", path, err) | ||||||
|  | 				lastErr = err | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if lastErr != nil { | ||||||
|  | 			t.Fatalf("cleanup failed: %s", err) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	tcs := []struct { | ||||||
|  | 		name     string | ||||||
|  | 		fn       storagedriver.WalkFn | ||||||
|  | 		from     string | ||||||
|  | 		expected []string | ||||||
|  | 		err      bool | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			name: "walk all", | ||||||
|  | 			fn:   func(fileInfo storagedriver.FileInfo) error { return nil }, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/file1", | ||||||
|  | 				"/folder1", | ||||||
|  | 				"/folder1/file1", | ||||||
|  | 				"/folder2", | ||||||
|  | 				"/folder2/file1", | ||||||
|  | 				"/folder3", | ||||||
|  | 				"/folder3/subfolder1", | ||||||
|  | 				"/folder3/subfolder1/subfolder1", | ||||||
|  | 				"/folder3/subfolder1/subfolder1/file1", | ||||||
|  | 				"/folder3/subfolder2", | ||||||
|  | 				"/folder3/subfolder2/subfolder1", | ||||||
|  | 				"/folder3/subfolder2/subfolder1/file1", | ||||||
|  | 				"/folder4", | ||||||
|  | 				"/folder4/file1", | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "skip directory", | ||||||
|  | 			fn: func(fileInfo storagedriver.FileInfo) error { | ||||||
|  | 				if fileInfo.Path() == "/folder3" { | ||||||
|  | 					return storagedriver.ErrSkipDir | ||||||
|  | 				} | ||||||
|  | 				if strings.Contains(fileInfo.Path(), "/folder3") { | ||||||
|  | 					t.Fatalf("skipped dir %s and should not walk %s", "/folder3", fileInfo.Path()) | ||||||
|  | 				} | ||||||
|  | 				return nil | ||||||
|  | 			}, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/file1", | ||||||
|  | 				"/folder1", | ||||||
|  | 				"/folder1/file1", | ||||||
|  | 				"/folder2", | ||||||
|  | 				"/folder2/file1", | ||||||
|  | 				"/folder3", | ||||||
|  | 				// folder 3 contents skipped
 | ||||||
|  | 				"/folder4", | ||||||
|  | 				"/folder4/file1", | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "stop early", | ||||||
|  | 			fn: func(fileInfo storagedriver.FileInfo) error { | ||||||
|  | 				if fileInfo.Path() == "/folder1/file1" { | ||||||
|  | 					return storagedriver.ErrSkipDir | ||||||
|  | 				} | ||||||
|  | 				return nil | ||||||
|  | 			}, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/file1", | ||||||
|  | 				"/folder1", | ||||||
|  | 				"/folder1/file1", | ||||||
|  | 				// stop early
 | ||||||
|  | 			}, | ||||||
|  | 			err: false, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "error", | ||||||
|  | 			fn: func(fileInfo storagedriver.FileInfo) error { | ||||||
|  | 				return errors.New("foo") | ||||||
|  | 			}, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/file1", | ||||||
|  | 			}, | ||||||
|  | 			err: true, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "from folder", | ||||||
|  | 			fn:   func(fileInfo storagedriver.FileInfo) error { return nil }, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/folder1", | ||||||
|  | 				"/folder1/file1", | ||||||
|  | 			}, | ||||||
|  | 			from: "/folder1", | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, tc := range tcs { | ||||||
|  | 		var walked []string | ||||||
|  | 		if tc.from == "" { | ||||||
|  | 			tc.from = "/" | ||||||
|  | 		} | ||||||
|  | 		t.Run(tc.name, func(t *testing.T) { | ||||||
|  | 			err := driver.Walk(context.Background(), tc.from, func(fileInfo storagedriver.FileInfo) error { | ||||||
|  | 				walked = append(walked, fileInfo.Path()) | ||||||
|  | 				return tc.fn(fileInfo) | ||||||
|  | 			}) | ||||||
|  | 			if tc.err && err == nil { | ||||||
|  | 				t.Fatalf("expected err") | ||||||
|  | 			} | ||||||
|  | 			if !tc.err && err != nil { | ||||||
|  | 				t.Fatalf(err.Error()) | ||||||
|  | 			} | ||||||
|  | 			compareWalked(t, tc.expected, walked) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestOverThousandBlobs(t *testing.T) { | func TestOverThousandBlobs(t *testing.T) { | ||||||
| 	if skipS3() != "" { | 	if skipS3() != "" { | ||||||
| 		t.Skip(skipS3()) | 		t.Skip(skipS3()) | ||||||
|  | @ -582,3 +742,14 @@ func TestMoveWithMultipartCopy(t *testing.T) { | ||||||
| 		t.Fatalf("unexpected error getting content: %v", err) | 		t.Fatalf("unexpected error getting content: %v", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func compareWalked(t *testing.T, expected, walked []string) { | ||||||
|  | 	if len(walked) != len(expected) { | ||||||
|  | 		t.Fatalf("Mismatch number of fileInfo walked %d expected %d; walked %s; expected %s;", len(walked), len(expected), walked, expected) | ||||||
|  | 	} | ||||||
|  | 	for i := range walked { | ||||||
|  | 		if walked[i] != expected[i] { | ||||||
|  | 			t.Fatalf("walked in unexpected order: expected %s; walked %s", expected, walked) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -658,7 +658,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Walk traverses a filesystem defined within driver, starting
 | // Walk traverses a filesystem defined within driver, starting
 | ||||||
| // from the given path, calling f on each file
 | // from the given path, calling f on each file and directory
 | ||||||
| func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { | ||||||
| 	return storagedriver.WalkFallback(ctx, d, path, f) | 	return storagedriver.WalkFallback(ctx, d, path, f) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -22,9 +22,14 @@ type WalkFn func(fileInfo FileInfo) error | ||||||
| // to a directory, the directory will not be entered and Walk
 | // to a directory, the directory will not be entered and Walk
 | ||||||
| // will continue the traversal.  If fileInfo refers to a normal file, processing stops
 | // will continue the traversal.  If fileInfo refers to a normal file, processing stops
 | ||||||
| func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error { | func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) error { | ||||||
|  | 	_, err := doWalkFallback(ctx, driver, from, f) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func doWalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn) (bool, error) { | ||||||
| 	children, err := driver.List(ctx, from) | 	children, err := driver.List(ctx, from) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return false, err | ||||||
| 	} | 	} | ||||||
| 	sort.Stable(sort.StringSlice(children)) | 	sort.Stable(sort.StringSlice(children)) | ||||||
| 	for _, child := range children { | 	for _, child := range children { | ||||||
|  | @ -40,22 +45,22 @@ func WalkFallback(ctx context.Context, driver StorageDriver, from string, f Walk | ||||||
| 				logrus.WithField("path", child).Infof("ignoring deleted path") | 				logrus.WithField("path", child).Infof("ignoring deleted path") | ||||||
| 				continue | 				continue | ||||||
| 			default: | 			default: | ||||||
| 				return err | 				return false, err | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		err = f(fileInfo) | 		err = f(fileInfo) | ||||||
| 		if err == nil && fileInfo.IsDir() { | 		if err == nil && fileInfo.IsDir() { | ||||||
| 			if err := WalkFallback(ctx, driver, child, f); err != nil { | 			if ok, err := doWalkFallback(ctx, driver, child, f); err != nil || !ok { | ||||||
| 				return err | 				return ok, err | ||||||
| 			} | 			} | ||||||
| 		} else if err == ErrSkipDir { | 		} else if err == ErrSkipDir { | ||||||
| 			// Stop iteration if it's a file, otherwise noop if it's a directory
 | 			// noop for folders, will just skip
 | ||||||
| 			if !fileInfo.IsDir() { | 			if !fileInfo.IsDir() { | ||||||
| 				return nil | 				return false, nil // no error but stop iteration
 | ||||||
| 			} | 			} | ||||||
| 		} else if err != nil { | 		} else if err != nil { | ||||||
| 			return err | 			return false, err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return true, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -3,6 +3,7 @@ package driver | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"strings" | ||||||
| 	"testing" | 	"testing" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -12,10 +13,10 @@ type changingFileSystem struct { | ||||||
| 	keptFiles map[string]bool | 	keptFiles map[string]bool | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (cfs *changingFileSystem) List(ctx context.Context, path string) ([]string, error) { | func (cfs *changingFileSystem) List(_ context.Context, _ string) ([]string, error) { | ||||||
| 	return cfs.fileset, nil | 	return cfs.fileset, nil | ||||||
| } | } | ||||||
| func (cfs *changingFileSystem) Stat(ctx context.Context, path string) (FileInfo, error) { | func (cfs *changingFileSystem) Stat(_ context.Context, path string) (FileInfo, error) { | ||||||
| 	kept, ok := cfs.keptFiles[path] | 	kept, ok := cfs.keptFiles[path] | ||||||
| 	if ok && kept { | 	if ok && kept { | ||||||
| 		return &FileInfoInternal{ | 		return &FileInfoInternal{ | ||||||
|  | @ -26,6 +27,32 @@ func (cfs *changingFileSystem) Stat(ctx context.Context, path string) (FileInfo, | ||||||
| 	} | 	} | ||||||
| 	return nil, PathNotFoundError{} | 	return nil, PathNotFoundError{} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | type fileSystem struct { | ||||||
|  | 	StorageDriver | ||||||
|  | 	// maps folder to list results
 | ||||||
|  | 	fileset map[string][]string | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (cfs *fileSystem) List(_ context.Context, path string) ([]string, error) { | ||||||
|  | 	return cfs.fileset[path], nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (cfs *fileSystem) Stat(_ context.Context, path string) (FileInfo, error) { | ||||||
|  | 	_, isDir := cfs.fileset[path] | ||||||
|  | 	return &FileInfoInternal{ | ||||||
|  | 		FileInfoFields: FileInfoFields{ | ||||||
|  | 			Path:  path, | ||||||
|  | 			IsDir: isDir, | ||||||
|  | 			Size:  int64(len(path)), | ||||||
|  | 		}, | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|  | func (cfs *fileSystem) isDir(path string) bool { | ||||||
|  | 	_, isDir := cfs.fileset[path] | ||||||
|  | 	return isDir | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestWalkFileRemoved(t *testing.T) { | func TestWalkFileRemoved(t *testing.T) { | ||||||
| 	d := &changingFileSystem{ | 	d := &changingFileSystem{ | ||||||
| 		fileset: []string{"zoidberg", "bender"}, | 		fileset: []string{"zoidberg", "bender"}, | ||||||
|  | @ -45,3 +72,111 @@ func TestWalkFileRemoved(t *testing.T) { | ||||||
| 		t.Fatalf(err.Error()) | 		t.Fatalf(err.Error()) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func TestWalkFallback(t *testing.T) { | ||||||
|  | 	d := &fileSystem{ | ||||||
|  | 		fileset: map[string][]string{ | ||||||
|  | 			"/":        {"/file1", "/folder1", "/folder2"}, | ||||||
|  | 			"/folder1": {"/folder1/file1"}, | ||||||
|  | 			"/folder2": {"/folder2/file1"}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	noopFn := func(fileInfo FileInfo) error { return nil } | ||||||
|  | 
 | ||||||
|  | 	tcs := []struct { | ||||||
|  | 		name     string | ||||||
|  | 		fn       WalkFn | ||||||
|  | 		from     string | ||||||
|  | 		expected []string | ||||||
|  | 		err      bool | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			name: "walk all", | ||||||
|  | 			fn:   noopFn, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/file1", | ||||||
|  | 				"/folder1", | ||||||
|  | 				"/folder1/file1", | ||||||
|  | 				"/folder2", | ||||||
|  | 				"/folder2/file1", | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "skip directory", | ||||||
|  | 			fn: func(fileInfo FileInfo) error { | ||||||
|  | 				if fileInfo.Path() == "/folder1" { | ||||||
|  | 					return ErrSkipDir | ||||||
|  | 				} | ||||||
|  | 				if strings.Contains(fileInfo.Path(), "/folder1") { | ||||||
|  | 					t.Fatalf("skipped dir %s and should not walk %s", "/folder1", fileInfo.Path()) | ||||||
|  | 				} | ||||||
|  | 				return nil | ||||||
|  | 			}, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/file1", | ||||||
|  | 				"/folder1", // return ErrSkipDir, skip anything under /folder1
 | ||||||
|  | 				// skip /folder1/file1
 | ||||||
|  | 				"/folder2", | ||||||
|  | 				"/folder2/file1", | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "stop early", | ||||||
|  | 			fn: func(fileInfo FileInfo) error { | ||||||
|  | 				if fileInfo.Path() == "/folder1/file1" { | ||||||
|  | 					return ErrSkipDir | ||||||
|  | 				} | ||||||
|  | 				return nil | ||||||
|  | 			}, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/file1", | ||||||
|  | 				"/folder1", | ||||||
|  | 				"/folder1/file1", | ||||||
|  | 				// stop early
 | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "from folder", | ||||||
|  | 			fn:   noopFn, | ||||||
|  | 			expected: []string{ | ||||||
|  | 				"/folder1/file1", | ||||||
|  | 			}, | ||||||
|  | 			from: "/folder1", | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, tc := range tcs { | ||||||
|  | 		var walked []string | ||||||
|  | 		if tc.from == "" { | ||||||
|  | 			tc.from = "/" | ||||||
|  | 		} | ||||||
|  | 		t.Run(tc.name, func(t *testing.T) { | ||||||
|  | 			err := WalkFallback(context.Background(), d, tc.from, func(fileInfo FileInfo) error { | ||||||
|  | 				walked = append(walked, fileInfo.Path()) | ||||||
|  | 				if fileInfo.IsDir() != d.isDir(fileInfo.Path()) { | ||||||
|  | 					t.Fatalf("fileInfo isDir not matching file system: expected %t actual %t", d.isDir(fileInfo.Path()), fileInfo.IsDir()) | ||||||
|  | 				} | ||||||
|  | 				return tc.fn(fileInfo) | ||||||
|  | 			}) | ||||||
|  | 			if tc.err && err == nil { | ||||||
|  | 				t.Fatalf("expected err") | ||||||
|  | 			} | ||||||
|  | 			if !tc.err && err != nil { | ||||||
|  | 				t.Fatalf(err.Error()) | ||||||
|  | 			} | ||||||
|  | 			compareWalked(t, tc.expected, walked) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func compareWalked(t *testing.T, expected, walked []string) { | ||||||
|  | 	if len(walked) != len(expected) { | ||||||
|  | 		t.Fatalf("Mismatch number of fileInfo walked %d expected %d; walked %s; expected %s;", len(walked), len(expected), walked, expected) | ||||||
|  | 	} | ||||||
|  | 	for i := range walked { | ||||||
|  | 		if walked[i] != expected[i] { | ||||||
|  | 			t.Fatalf("expected walked to come in order expected: walked %s", walked) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue