From b7f911cbcdb8604a3ae903dad284b54566b81a68 Mon Sep 17 00:00:00 2001 From: "Laisky.Cai" Date: Thu, 10 Apr 2025 01:52:46 +0000 Subject: [PATCH 1/2] feat: add `--bfs` option to `mc mirror` for layer-by-layer traversal in S3 client operations closes #4873 --- cmd/client-s3.go | 17 +++++ cmd/client-url.go | 12 ++-- cmd/client.go | 2 + cmd/difference.go | 172 ++++++++++++++++++++++++++++++++++++++++++++- cmd/mirror-main.go | 7 +- cmd/mirror-url.go | 1 + 6 files changed, 205 insertions(+), 6 deletions(-) diff --git a/cmd/client-s3.go b/cmd/client-s3.go index ff95a79300..96a50d0980 100644 --- a/cmd/client-s3.go +++ b/cmd/client-s3.go @@ -1839,6 +1839,10 @@ func (c *S3Client) listVersionsRoutine(ctx context.Context, b, o string, opts Li buckets = append(buckets, b) } + if opts.Prefix != "" { + o = opts.Prefix + } + for _, b := range buckets { var skipKey string for objectVersion := range c.api.ListObjects(ctx, b, minio.ListObjectsOptions{ @@ -2104,6 +2108,10 @@ func (c *S3Client) listIncompleteInRoutine(ctx context.Context, contentCh chan * func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { // get bucket and object from URL. b, o := c.url2BucketAndObject() + if opts.Prefix != "" { + o = opts.Prefix + } + switch { case b == "" && o == "": buckets, err := c.api.ListBuckets(ctx) @@ -2243,6 +2251,7 @@ func (c *S3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInf } url.Path = c.buildAbsPath(bucket, entry.Key) content.URL = url + content.ObjectKey = entry.Key content.BucketName = bucket content.Size = entry.Size content.ETag = entry.ETag @@ -2321,6 +2330,10 @@ func (c *S3Client) bucketStat(ctx context.Context, opts BucketStatOptions) (*Cli func (c *S3Client) listInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { // get bucket and object from URL. b, o := c.url2BucketAndObject() + if opts.Prefix != "" { + o = opts.Prefix + } + if opts.ListZip && (b == "" || o == "") { contentCh <- &ClientContent{ Err: probe.NewError(errors.New("listing zip files must provide bucket and object")), @@ -2385,6 +2398,10 @@ func sortBucketsNameWithSlash(bucketsInfo []minio.BucketInfo) { func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { // get bucket and object from URL. b, o := c.url2BucketAndObject() + if opts.Prefix != "" { + o = opts.Prefix + } + switch { case b == "" && o == "": buckets, err := c.api.ListBuckets(ctx) diff --git a/cmd/client-url.go b/cmd/client-url.go index 5a56851be8..5b5450bab6 100644 --- a/cmd/client-url.go +++ b/cmd/client-url.go @@ -190,10 +190,14 @@ func (u ClientURL) String() string { } // urlJoinPath Join a path to existing URL. -func urlJoinPath(url1, url2 string) string { - u1 := newClientURL(url1) - u2 := newClientURL(url2) - return joinURLs(u1, u2).String() +func urlJoinPath(base, element string) string { + if strings.HasSuffix(base, "/") && strings.HasPrefix(element, "/") { + return base + element[1:] + } + if !strings.HasSuffix(base, "/") && !strings.HasPrefix(element, "/") { + return base + "/" + element + } + return base + element } // url2Stat returns stat info for URL - supports bucket, object and a prefixe with or without a trailing slash diff --git a/cmd/client.go b/cmd/client.go index cb81f57348..dda58c4fe2 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -108,6 +108,7 @@ type ListOptions struct { TimeRef time.Time ShowDir DirOpt Count int + Prefix string // Add prefix support } // CopyOptions holds options for copying operation @@ -213,6 +214,7 @@ type Client interface { // ClientContent - Content container for content metadata type ClientContent struct { URL ClientURL + ObjectKey string BucketName string // only valid and set for client-type objectStorage Time time.Time Size int64 diff --git a/cmd/difference.go b/cmd/difference.go index 1a967e1522..5655f4a50c 100644 --- a/cmd/difference.go +++ b/cmd/difference.go @@ -82,6 +82,171 @@ func getSourceModTimeKey(metadata map[string]string) string { return "" } +// layerDifference performs a breadth-first search (BFS) comparison between source and target. +// Unlike the standard recursive listing approach, this function traverses the object hierarchy +// layer by layer (directory by directory), which prevents overwhelming the server with +// large recursive listing operations that could cause timeouts or connection failures. +// +// This approach is especially useful for buckets containing millions of objects where +// a standard recursive listing might cause server-side resource exhaustion. By exploring +// the hierarchy level by level and comparing objects at each layer, this function provides +// a more scalable solution for large object stores. +// +// The BFS approach: +// 1. Starts with the root prefix ("") for both source and target +// 2. Lists objects at the current level/prefix (non-recursively) +// 3. Compares objects found at this level +// 4. Queues any directories found for exploration in the next iteration +// 5. Continues until all directories in both source and target are explored +// +// This is enabled with the --bfs parameter to avoid the limitations of recursive listing. +func layerDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage { + diffCh := make(chan diffMessage, 10000) + + go func() { + defer close(diffCh) + + // Channels to feed items found by BFS into the difference engine + srcClientCh := make(chan *ClientContent, 1000) + tgtClientCh := make(chan *ClientContent, 1000) + + // Goroutine to perform BFS on the source + go func() { + defer close(srcClientCh) + // Queue for *relative object prefixes* to explore + queue := []string{""} // "" represents the root prefix + + for len(queue) > 0 { + // Dequeue the next relative prefix + prefix := queue[0] + queue = queue[1:] + + // List items at the current prefix level using the relative prefix + listCtx, listCancel := context.WithCancel(ctx) + contentsCh := sourceClnt.List(listCtx, ListOptions{ + Recursive: false, // List only the current level + WithMetadata: opts.isMetadata, + ShowDir: DirLast, // Ensure directories are listed + Prefix: prefix, // Pass the relative prefix + }) + + for content := range contentsCh { + select { + case <-ctx.Done(): + listCancel() + return + default: + if content != nil && content.Err != nil { + srcClientCh <- content + listCancel() + continue + } + if content == nil { + continue + } + + // Send the valid content (file or directory) for comparison + srcClientCh <- content + + // If it's a directory, queue its *relative object key* for the next level + if content.Type.IsDir() { + relativeKey := content.ObjectKey // Get the relative key + // Prevent infinite loops: don't re-queue the prefix we just listed, + // especially the root ("") which might list itself as "/" depending on backend. + // Also check if ObjectKey is populated. + if relativeKey != "" && relativeKey != prefix { + // Ensure the key ends with a separator if it's a directory prefix + // The S3 ListObjects usually returns directory keys ending with '/' + if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) { + // This case might indicate a non-standard directory representation, handle cautiously + // For standard S3, common prefixes already end in '/' + // If needed, append separator: relativeKey += string(content.URL.Separator) + } + // Add the relative key (prefix) to the queue + queue = append(queue, relativeKey) + } + } + } + } + listCancel() + } + }() + + // Goroutine to perform BFS on the target (symmetric to the source) + go func() { + defer close(tgtClientCh) + // Queue for *relative object prefixes* + queue := []string{""} + + for len(queue) > 0 { + prefix := queue[0] + queue = queue[1:] + + listCtx, listCancel := context.WithCancel(ctx) + contentsCh := targetClnt.List(listCtx, ListOptions{ + Recursive: false, + WithMetadata: opts.isMetadata, + ShowDir: DirLast, + Prefix: prefix, // Pass the relative prefix + }) + + for content := range contentsCh { + select { + case <-ctx.Done(): + listCancel() + return + default: + if content != nil && content.Err != nil { + tgtClientCh <- content + listCancel() + continue + } + if content == nil { + continue + } + + tgtClientCh <- content + + // If it's a directory, queue its *relative object key* + if content.Type.IsDir() { + relativeKey := content.ObjectKey + if relativeKey != "" && relativeKey != prefix { + // Ensure trailing slash if needed (usually present from S3 List) + if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) { + // Handle non-standard directory representation if necessary + } + queue = append(queue, relativeKey) + } + } + } + } + listCancel() + } + }() + + // Comparison logic remains the same + err := differenceInternal( + sourceClnt.GetURL().String(), + srcClientCh, + targetClnt.GetURL().String(), + tgtClientCh, + opts, + false, // returnSimilar is false + diffCh, + ) + + if err != nil { + select { + case <-ctx.Done(): + default: + diffCh <- diffMessage{Error: err} + } + } + }() + + return diffCh +} + // activeActiveModTimeUpdated tries to calculate if the object copy in the target // is older than the one in the source by comparing the modtime of the data. func activeActiveModTimeUpdated(src, dst *ClientContent) bool { @@ -167,7 +332,12 @@ func bucketObjectDifference(ctx context.Context, sourceClnt, targetClnt Client) }) } -func objectDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) (diffCh chan diffMessage) { +func objectDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage { + if opts.bfs { + // Use layer-by-layer difference for regular objects + return layerDifference(ctx, sourceClnt, targetClnt, opts) + } + sourceURL := sourceClnt.GetURL().String() sourceCh := sourceClnt.List(ctx, ListOptions{Recursive: true, WithMetadata: opts.isMetadata, ShowDir: DirNone}) diff --git a/cmd/mirror-main.go b/cmd/mirror-main.go index 62f6958d4e..d4c1505f4a 100644 --- a/cmd/mirror-main.go +++ b/cmd/mirror-main.go @@ -144,6 +144,10 @@ var ( Name: "skip-errors", Usage: "skip any errors when mirroring", }, + cli.BoolFlag{ + Name: "bfs", + Usage: "using BFS for layer-by-layer traversal of files, suitable for large number of files", + }, checksumFlag, } ) @@ -212,7 +216,7 @@ EXAMPLES: {{.Prompt}} {{.HelpName}} --older-than 30d s3/test ~/test 13. Mirror server encrypted objects from Amazon S3 cloud storage to a bucket on Amazon S3 cloud storage - {{.Prompt}} {{.HelpName}} --enc-c "minio/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDA" --enc-c "s3/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5BBB" s3/archive/ minio/archive/ + {{.Prompt}} {{.HelpName}} --enc-c "minio/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDA" --enc-c "s3/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5BBB" s3/archive/ minio/archive/ 14. Update 'Cache-Control' header on all existing objects recursively. {{.Prompt}} {{.HelpName}} --attr "Cache-Control=max-age=90000,min-fresh=9000" myminio/video-files myminio/video-files @@ -1024,6 +1028,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc userMetadata: userMetadata, encKeyDB: encKeyDB, activeActive: isWatch, + bfs: cli.Bool("bfs"), } // If we are not using active/active and we are not removing diff --git a/cmd/mirror-url.go b/cmd/mirror-url.go index 1c71fb8917..5f465fcdde 100644 --- a/cmd/mirror-url.go +++ b/cmd/mirror-url.go @@ -278,6 +278,7 @@ type mirrorOptions struct { userMetadata map[string]string checksum minio.ChecksumType sourceListingOnly bool + bfs bool } // Prepares urls that need to be copied or removed based on requested options. From 3a83f12dc705a09169b7302799f11c462b041016 Mon Sep 17 00:00:00 2001 From: "Laisky.Cai" Date: Sun, 13 Apr 2025 14:05:40 +0000 Subject: [PATCH 2/2] refactor: replace channel-based BFS with iterator-based implementation for layer difference --- cmd/client-fs.go | 1 - cmd/difference.go | 401 ++++++++++++++++++++++++++++++---------------- 2 files changed, 267 insertions(+), 135 deletions(-) diff --git a/cmd/client-fs.go b/cmd/client-fs.go index 031d24392c..749b1e886f 100644 --- a/cmd/client-fs.go +++ b/cmd/client-fs.go @@ -33,7 +33,6 @@ import ( "time" "github.com/google/uuid" - "github.com/pkg/xattr" "github.com/rjeczalik/notify" diff --git a/cmd/difference.go b/cmd/difference.go index 5655f4a50c..49aaf7b7ff 100644 --- a/cmd/difference.go +++ b/cmd/difference.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "iter" "strings" "time" "unicode/utf8" @@ -82,163 +83,295 @@ func getSourceModTimeKey(metadata map[string]string) string { return "" } -// layerDifference performs a breadth-first search (BFS) comparison between source and target. -// Unlike the standard recursive listing approach, this function traverses the object hierarchy -// layer by layer (directory by directory), which prevents overwhelming the server with -// large recursive listing operations that could cause timeouts or connection failures. -// -// This approach is especially useful for buckets containing millions of objects where -// a standard recursive listing might cause server-side resource exhaustion. By exploring -// the hierarchy level by level and comparing objects at each layer, this function provides -// a more scalable solution for large object stores. -// -// The BFS approach: -// 1. Starts with the root prefix ("") for both source and target -// 2. Lists objects at the current level/prefix (non-recursively) -// 3. Compares objects found at this level -// 4. Queues any directories found for exploration in the next iteration -// 5. Continues until all directories in both source and target are explored -// -// This is enabled with the --bfs parameter to avoid the limitations of recursive listing. -func layerDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage { - diffCh := make(chan diffMessage, 10000) +// listBFS performs a breadth-first listing using iter.Seq. +func listBFS(ctx context.Context, clnt Client, opts mirrorOptions) iter.Seq[*ClientContent] { + return func(yield func(*ClientContent) bool) { + // Queue for *relative object prefixes* to explore + queue := []string{""} // "" represents the root prefix - go func() { - defer close(diffCh) + for len(queue) > 0 { + // Check for context cancellation at the start of each level processing + select { + case <-ctx.Done(): + yield(&ClientContent{Err: probe.NewError(ctx.Err())}) + return + default: + } - // Channels to feed items found by BFS into the difference engine - srcClientCh := make(chan *ClientContent, 1000) - tgtClientCh := make(chan *ClientContent, 1000) - - // Goroutine to perform BFS on the source - go func() { - defer close(srcClientCh) - // Queue for *relative object prefixes* to explore - queue := []string{""} // "" represents the root prefix - - for len(queue) > 0 { - // Dequeue the next relative prefix - prefix := queue[0] - queue = queue[1:] - - // List items at the current prefix level using the relative prefix - listCtx, listCancel := context.WithCancel(ctx) - contentsCh := sourceClnt.List(listCtx, ListOptions{ - Recursive: false, // List only the current level - WithMetadata: opts.isMetadata, - ShowDir: DirLast, // Ensure directories are listed - Prefix: prefix, // Pass the relative prefix - }) - - for content := range contentsCh { - select { - case <-ctx.Done(): - listCancel() + // Dequeue the next relative prefix + prefix := queue[0] + queue = queue[1:] + + // List items at the current prefix level using the relative prefix + listCtx, listCancel := context.WithCancel(ctx) + contentsCh := clnt.List(listCtx, ListOptions{ + Recursive: false, // List only the current level + WithMetadata: opts.isMetadata, + ShowDir: DirLast, // Ensure directories are listed + Prefix: prefix, // Pass the relative prefix + }) + + for content := range contentsCh { + select { + case <-ctx.Done(): + listCancel() + yield(&ClientContent{Err: probe.NewError(ctx.Err())}) + return + default: + if content == nil { + continue // Should not happen, but defensive check + } + + // Yield the content (or error) + if !yield(content) { + listCancel() // Stop listing if yield returns false return - default: - if content != nil && content.Err != nil { - srcClientCh <- content - listCancel() - continue - } - if content == nil { - continue - } + } - // Send the valid content (file or directory) for comparison - srcClientCh <- content - - // If it's a directory, queue its *relative object key* for the next level - if content.Type.IsDir() { - relativeKey := content.ObjectKey // Get the relative key - // Prevent infinite loops: don't re-queue the prefix we just listed, - // especially the root ("") which might list itself as "/" depending on backend. - // Also check if ObjectKey is populated. - if relativeKey != "" && relativeKey != prefix { - // Ensure the key ends with a separator if it's a directory prefix - // The S3 ListObjects usually returns directory keys ending with '/' - if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) { - // This case might indicate a non-standard directory representation, handle cautiously - // For standard S3, common prefixes already end in '/' - // If needed, append separator: relativeKey += string(content.URL.Separator) - } - // Add the relative key (prefix) to the queue - queue = append(queue, relativeKey) + // If yield processed an error, stop this prefix processing + if content.Err != nil { + listCancel() + break // Move to the next prefix in the queue + } + + // If it's a directory, queue its *relative object key* for the next level + if content.Type.IsDir() { + relativeKey := content.ObjectKey // Get the relative key + // Prevent infinite loops: don't re-queue the prefix we just listed, + // especially the root ("") which might list itself as "/" depending on backend. + // Also check if ObjectKey is populated. + if relativeKey != "" && relativeKey != prefix { + // Ensure the key ends with a separator if it's a directory prefix + // The S3 ListObjects usually returns directory keys ending with '/' + if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) { + // This case might indicate a non-standard directory representation, handle cautiously + // For standard S3, common prefixes already end in '/' + // If needed, append separator: relativeKey += string(content.URL.Separator) } + // Add the relative key (prefix) to the queue + queue = append(queue, relativeKey) } } } - listCancel() } - }() - - // Goroutine to perform BFS on the target (symmetric to the source) - go func() { - defer close(tgtClientCh) - // Queue for *relative object prefixes* - queue := []string{""} - - for len(queue) > 0 { - prefix := queue[0] - queue = queue[1:] - - listCtx, listCancel := context.WithCancel(ctx) - contentsCh := targetClnt.List(listCtx, ListOptions{ - Recursive: false, - WithMetadata: opts.isMetadata, - ShowDir: DirLast, - Prefix: prefix, // Pass the relative prefix - }) - - for content := range contentsCh { - select { - case <-ctx.Done(): - listCancel() - return - default: - if content != nil && content.Err != nil { - tgtClientCh <- content - listCancel() - continue - } - if content == nil { - continue - } + listCancel() // Ensure list context is cancelled after processing its results + } + } +} - tgtClientCh <- content - - // If it's a directory, queue its *relative object key* - if content.Type.IsDir() { - relativeKey := content.ObjectKey - if relativeKey != "" && relativeKey != prefix { - // Ensure trailing slash if needed (usually present from S3 List) - if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) { - // Handle non-standard directory representation if necessary - } - queue = append(queue, relativeKey) - } - } - } +// differenceIterInternal compares two sequences provided by iter.Seq. +func differenceIterInternal( + ctx context.Context, // Pass context for cancellation checks + sourceURL string, + srcSeq iter.Seq[*ClientContent], + targetURL string, + tgtSeq iter.Seq[*ClientContent], + opts mirrorOptions, + returnSimilar bool, + diffCh chan<- diffMessage, +) *probe.Error { + srcNext, srcStop := iter.Pull(srcSeq) + defer srcStop() + tgtNext, tgtStop := iter.Pull(tgtSeq) + defer tgtStop() + + srcCtnt, srcOk := srcNext() + tgtCtnt, tgtOk := tgtNext() + + var srcEOF, tgtEOF bool + + for { + // Check for context cancellation in the loop + select { + case <-ctx.Done(): + return probe.NewError(ctx.Err()) + default: + } + + srcEOF = !srcOk + tgtEOF = !tgtOk + + // No objects from source AND target: Finish + if opts.sourceListingOnly { + if srcEOF { + break + } + } else { + if srcEOF && tgtEOF { + break + } + } + + if !srcEOF && srcCtnt.Err != nil { + return srcCtnt.Err.Trace(sourceURL, targetURL) + } + + if !tgtEOF && tgtCtnt.Err != nil { + return tgtCtnt.Err.Trace(sourceURL, targetURL) + } + + // If source doesn't have objects anymore, comparison becomes obvious + if srcEOF { + diffCh <- diffMessage{ + SecondURL: tgtCtnt.URL.String(), + Diff: differInSecond, + secondContent: tgtCtnt, + } + tgtCtnt, tgtOk = tgtNext() + continue + } + + // The same for target + if tgtEOF { + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + Diff: differInFirst, + firstContent: srcCtnt, + } + srcCtnt, srcOk = srcNext() + continue + } + + srcSuffix := strings.TrimPrefix(srcCtnt.URL.String(), sourceURL) + tgtSuffix := strings.TrimPrefix(tgtCtnt.URL.String(), targetURL) + + current := urlJoinPath(targetURL, srcSuffix) + expected := urlJoinPath(targetURL, tgtSuffix) + + if !utf8.ValidString(srcSuffix) { + // Error. Keys must be valid UTF-8. + diffCh <- diffMessage{Error: errInvalidSource(current).Trace()} + srcCtnt, srcOk = srcNext() + continue + } + if !utf8.ValidString(tgtSuffix) { + // Error. Keys must be valid UTF-8. + diffCh <- diffMessage{Error: errInvalidTarget(expected).Trace()} + tgtCtnt, tgtOk = tgtNext() + continue + } + + // Normalize to avoid situations where multiple byte representations are possible. + normalizedCurrent := norm.NFC.String(current) + normalizedExpected := norm.NFC.String(expected) + + if normalizedExpected > normalizedCurrent { + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + Diff: differInFirst, + firstContent: srcCtnt, + } + srcCtnt, srcOk = srcNext() + continue + } + if normalizedExpected == normalizedCurrent { + srcType, tgtType := srcCtnt.Type, tgtCtnt.Type + srcSize, tgtSize := srcCtnt.Size, tgtCtnt.Size + if srcType.IsRegular() && !tgtType.IsRegular() || + !srcType.IsRegular() && tgtType.IsRegular() { + // Type differs. Source is never a directory. + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInType, + firstContent: srcCtnt, + secondContent: tgtCtnt, } - listCancel() + // Consume both as they matched by name + srcCtnt, srcOk = srcNext() + tgtCtnt, tgtOk = tgtNext() + continue // Added continue } - }() + if srcSize != tgtSize { + // Regular files differing in size. + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInSize, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } else if activeActiveModTimeUpdated(srcCtnt, tgtCtnt) { + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInAASourceMTime, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } else if opts.isMetadata && + !metadataEqual(srcCtnt.UserMetadata, tgtCtnt.UserMetadata) && + !metadataEqual(srcCtnt.Metadata, tgtCtnt.Metadata) { - // Comparison logic remains the same - err := differenceInternal( + // Regular files user requesting additional metadata to same file. + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInMetadata, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } else if returnSimilar { // Check returnSimilar only if no other diff found + // No differ + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInNone, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } + srcCtnt, srcOk = srcNext() + tgtCtnt, tgtOk = tgtNext() + continue + } + // Differ in second (normalizedExpected < normalizedCurrent) + diffCh <- diffMessage{ + SecondURL: tgtCtnt.URL.String(), + Diff: differInSecond, + secondContent: tgtCtnt, + } + tgtCtnt, tgtOk = tgtNext() + continue + } + + return nil +} + +// layerDifferenceIter performs a breadth-first search (BFS) comparison using iter.Seq. +func layerDifferenceIter(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage { + diffCh := make(chan diffMessage, 10000) + + go func() { + defer close(diffCh) + + // Create iterators using the BFS lister + srcSeq := listBFS(ctx, sourceClnt, opts) + tgtSeq := listBFS(ctx, targetClnt, opts) + + // Perform the comparison using the iterators + err := differenceIterInternal( + ctx, // Pass context sourceClnt.GetURL().String(), - srcClientCh, + srcSeq, targetClnt.GetURL().String(), - tgtClientCh, + tgtSeq, opts, false, // returnSimilar is false diffCh, ) if err != nil { + // Check if the error is due to context cancellation before sending select { case <-ctx.Done(): + // Context was cancelled, avoid sending error if it's context.Canceled or context.DeadlineExceeded + if !(err.ToGoError() == context.Canceled || err.ToGoError() == context.DeadlineExceeded) { + // Send non-cancellation errors even if context is done, as they might be important + diffCh <- diffMessage{Error: err} + } default: + // Context not cancelled, send the error diffCh <- diffMessage{Error: err} } } @@ -335,7 +468,7 @@ func bucketObjectDifference(ctx context.Context, sourceClnt, targetClnt Client) func objectDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage { if opts.bfs { // Use layer-by-layer difference for regular objects - return layerDifference(ctx, sourceClnt, targetClnt, opts) + return layerDifferenceIter(ctx, sourceClnt, targetClnt, opts) } sourceURL := sourceClnt.GetURL().String()