diff --git a/cmd/client-fs.go b/cmd/client-fs.go index 031d24392c..749b1e886f 100644 --- a/cmd/client-fs.go +++ b/cmd/client-fs.go @@ -33,7 +33,6 @@ import ( "time" "github.com/google/uuid" - "github.com/pkg/xattr" "github.com/rjeczalik/notify" diff --git a/cmd/client-s3.go b/cmd/client-s3.go index ff95a79300..96a50d0980 100644 --- a/cmd/client-s3.go +++ b/cmd/client-s3.go @@ -1839,6 +1839,10 @@ func (c *S3Client) listVersionsRoutine(ctx context.Context, b, o string, opts Li buckets = append(buckets, b) } + if opts.Prefix != "" { + o = opts.Prefix + } + for _, b := range buckets { var skipKey string for objectVersion := range c.api.ListObjects(ctx, b, minio.ListObjectsOptions{ @@ -2104,6 +2108,10 @@ func (c *S3Client) listIncompleteInRoutine(ctx context.Context, contentCh chan * func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { // get bucket and object from URL. b, o := c.url2BucketAndObject() + if opts.Prefix != "" { + o = opts.Prefix + } + switch { case b == "" && o == "": buckets, err := c.api.ListBuckets(ctx) @@ -2243,6 +2251,7 @@ func (c *S3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInf } url.Path = c.buildAbsPath(bucket, entry.Key) content.URL = url + content.ObjectKey = entry.Key content.BucketName = bucket content.Size = entry.Size content.ETag = entry.ETag @@ -2321,6 +2330,10 @@ func (c *S3Client) bucketStat(ctx context.Context, opts BucketStatOptions) (*Cli func (c *S3Client) listInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { // get bucket and object from URL. b, o := c.url2BucketAndObject() + if opts.Prefix != "" { + o = opts.Prefix + } + if opts.ListZip && (b == "" || o == "") { contentCh <- &ClientContent{ Err: probe.NewError(errors.New("listing zip files must provide bucket and object")), @@ -2385,6 +2398,10 @@ func sortBucketsNameWithSlash(bucketsInfo []minio.BucketInfo) { func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { // get bucket and object from URL. b, o := c.url2BucketAndObject() + if opts.Prefix != "" { + o = opts.Prefix + } + switch { case b == "" && o == "": buckets, err := c.api.ListBuckets(ctx) diff --git a/cmd/client-url.go b/cmd/client-url.go index 5a56851be8..5b5450bab6 100644 --- a/cmd/client-url.go +++ b/cmd/client-url.go @@ -190,10 +190,14 @@ func (u ClientURL) String() string { } // urlJoinPath Join a path to existing URL. -func urlJoinPath(url1, url2 string) string { - u1 := newClientURL(url1) - u2 := newClientURL(url2) - return joinURLs(u1, u2).String() +func urlJoinPath(base, element string) string { + if strings.HasSuffix(base, "/") && strings.HasPrefix(element, "/") { + return base + element[1:] + } + if !strings.HasSuffix(base, "/") && !strings.HasPrefix(element, "/") { + return base + "/" + element + } + return base + element } // url2Stat returns stat info for URL - supports bucket, object and a prefixe with or without a trailing slash diff --git a/cmd/client.go b/cmd/client.go index cb81f57348..dda58c4fe2 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -108,6 +108,7 @@ type ListOptions struct { TimeRef time.Time ShowDir DirOpt Count int + Prefix string // Add prefix support } // CopyOptions holds options for copying operation @@ -213,6 +214,7 @@ type Client interface { // ClientContent - Content container for content metadata type ClientContent struct { URL ClientURL + ObjectKey string BucketName string // only valid and set for client-type objectStorage Time time.Time Size int64 diff --git a/cmd/difference.go b/cmd/difference.go index 1a967e1522..49aaf7b7ff 100644 --- a/cmd/difference.go +++ b/cmd/difference.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "iter" "strings" "time" "unicode/utf8" @@ -82,6 +83,303 @@ func getSourceModTimeKey(metadata map[string]string) string { return "" } +// listBFS performs a breadth-first listing using iter.Seq. +func listBFS(ctx context.Context, clnt Client, opts mirrorOptions) iter.Seq[*ClientContent] { + return func(yield func(*ClientContent) bool) { + // Queue for *relative object prefixes* to explore + queue := []string{""} // "" represents the root prefix + + for len(queue) > 0 { + // Check for context cancellation at the start of each level processing + select { + case <-ctx.Done(): + yield(&ClientContent{Err: probe.NewError(ctx.Err())}) + return + default: + } + + // Dequeue the next relative prefix + prefix := queue[0] + queue = queue[1:] + + // List items at the current prefix level using the relative prefix + listCtx, listCancel := context.WithCancel(ctx) + contentsCh := clnt.List(listCtx, ListOptions{ + Recursive: false, // List only the current level + WithMetadata: opts.isMetadata, + ShowDir: DirLast, // Ensure directories are listed + Prefix: prefix, // Pass the relative prefix + }) + + for content := range contentsCh { + select { + case <-ctx.Done(): + listCancel() + yield(&ClientContent{Err: probe.NewError(ctx.Err())}) + return + default: + if content == nil { + continue // Should not happen, but defensive check + } + + // Yield the content (or error) + if !yield(content) { + listCancel() // Stop listing if yield returns false + return + } + + // If yield processed an error, stop this prefix processing + if content.Err != nil { + listCancel() + break // Move to the next prefix in the queue + } + + // If it's a directory, queue its *relative object key* for the next level + if content.Type.IsDir() { + relativeKey := content.ObjectKey // Get the relative key + // Prevent infinite loops: don't re-queue the prefix we just listed, + // especially the root ("") which might list itself as "/" depending on backend. + // Also check if ObjectKey is populated. + if relativeKey != "" && relativeKey != prefix { + // Ensure the key ends with a separator if it's a directory prefix + // The S3 ListObjects usually returns directory keys ending with '/' + if !strings.HasSuffix(relativeKey, string(content.URL.Separator)) { + // This case might indicate a non-standard directory representation, handle cautiously + // For standard S3, common prefixes already end in '/' + // If needed, append separator: relativeKey += string(content.URL.Separator) + } + // Add the relative key (prefix) to the queue + queue = append(queue, relativeKey) + } + } + } + } + listCancel() // Ensure list context is cancelled after processing its results + } + } +} + +// differenceIterInternal compares two sequences provided by iter.Seq. +func differenceIterInternal( + ctx context.Context, // Pass context for cancellation checks + sourceURL string, + srcSeq iter.Seq[*ClientContent], + targetURL string, + tgtSeq iter.Seq[*ClientContent], + opts mirrorOptions, + returnSimilar bool, + diffCh chan<- diffMessage, +) *probe.Error { + srcNext, srcStop := iter.Pull(srcSeq) + defer srcStop() + tgtNext, tgtStop := iter.Pull(tgtSeq) + defer tgtStop() + + srcCtnt, srcOk := srcNext() + tgtCtnt, tgtOk := tgtNext() + + var srcEOF, tgtEOF bool + + for { + // Check for context cancellation in the loop + select { + case <-ctx.Done(): + return probe.NewError(ctx.Err()) + default: + } + + srcEOF = !srcOk + tgtEOF = !tgtOk + + // No objects from source AND target: Finish + if opts.sourceListingOnly { + if srcEOF { + break + } + } else { + if srcEOF && tgtEOF { + break + } + } + + if !srcEOF && srcCtnt.Err != nil { + return srcCtnt.Err.Trace(sourceURL, targetURL) + } + + if !tgtEOF && tgtCtnt.Err != nil { + return tgtCtnt.Err.Trace(sourceURL, targetURL) + } + + // If source doesn't have objects anymore, comparison becomes obvious + if srcEOF { + diffCh <- diffMessage{ + SecondURL: tgtCtnt.URL.String(), + Diff: differInSecond, + secondContent: tgtCtnt, + } + tgtCtnt, tgtOk = tgtNext() + continue + } + + // The same for target + if tgtEOF { + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + Diff: differInFirst, + firstContent: srcCtnt, + } + srcCtnt, srcOk = srcNext() + continue + } + + srcSuffix := strings.TrimPrefix(srcCtnt.URL.String(), sourceURL) + tgtSuffix := strings.TrimPrefix(tgtCtnt.URL.String(), targetURL) + + current := urlJoinPath(targetURL, srcSuffix) + expected := urlJoinPath(targetURL, tgtSuffix) + + if !utf8.ValidString(srcSuffix) { + // Error. Keys must be valid UTF-8. + diffCh <- diffMessage{Error: errInvalidSource(current).Trace()} + srcCtnt, srcOk = srcNext() + continue + } + if !utf8.ValidString(tgtSuffix) { + // Error. Keys must be valid UTF-8. + diffCh <- diffMessage{Error: errInvalidTarget(expected).Trace()} + tgtCtnt, tgtOk = tgtNext() + continue + } + + // Normalize to avoid situations where multiple byte representations are possible. + normalizedCurrent := norm.NFC.String(current) + normalizedExpected := norm.NFC.String(expected) + + if normalizedExpected > normalizedCurrent { + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + Diff: differInFirst, + firstContent: srcCtnt, + } + srcCtnt, srcOk = srcNext() + continue + } + if normalizedExpected == normalizedCurrent { + srcType, tgtType := srcCtnt.Type, tgtCtnt.Type + srcSize, tgtSize := srcCtnt.Size, tgtCtnt.Size + if srcType.IsRegular() && !tgtType.IsRegular() || + !srcType.IsRegular() && tgtType.IsRegular() { + // Type differs. Source is never a directory. + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInType, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + // Consume both as they matched by name + srcCtnt, srcOk = srcNext() + tgtCtnt, tgtOk = tgtNext() + continue // Added continue + } + if srcSize != tgtSize { + // Regular files differing in size. + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInSize, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } else if activeActiveModTimeUpdated(srcCtnt, tgtCtnt) { + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInAASourceMTime, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } else if opts.isMetadata && + !metadataEqual(srcCtnt.UserMetadata, tgtCtnt.UserMetadata) && + !metadataEqual(srcCtnt.Metadata, tgtCtnt.Metadata) { + + // Regular files user requesting additional metadata to same file. + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInMetadata, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } else if returnSimilar { // Check returnSimilar only if no other diff found + // No differ + diffCh <- diffMessage{ + FirstURL: srcCtnt.URL.String(), + SecondURL: tgtCtnt.URL.String(), + Diff: differInNone, + firstContent: srcCtnt, + secondContent: tgtCtnt, + } + } + srcCtnt, srcOk = srcNext() + tgtCtnt, tgtOk = tgtNext() + continue + } + // Differ in second (normalizedExpected < normalizedCurrent) + diffCh <- diffMessage{ + SecondURL: tgtCtnt.URL.String(), + Diff: differInSecond, + secondContent: tgtCtnt, + } + tgtCtnt, tgtOk = tgtNext() + continue + } + + return nil +} + +// layerDifferenceIter performs a breadth-first search (BFS) comparison using iter.Seq. +func layerDifferenceIter(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage { + diffCh := make(chan diffMessage, 10000) + + go func() { + defer close(diffCh) + + // Create iterators using the BFS lister + srcSeq := listBFS(ctx, sourceClnt, opts) + tgtSeq := listBFS(ctx, targetClnt, opts) + + // Perform the comparison using the iterators + err := differenceIterInternal( + ctx, // Pass context + sourceClnt.GetURL().String(), + srcSeq, + targetClnt.GetURL().String(), + tgtSeq, + opts, + false, // returnSimilar is false + diffCh, + ) + + if err != nil { + // Check if the error is due to context cancellation before sending + select { + case <-ctx.Done(): + // Context was cancelled, avoid sending error if it's context.Canceled or context.DeadlineExceeded + if !(err.ToGoError() == context.Canceled || err.ToGoError() == context.DeadlineExceeded) { + // Send non-cancellation errors even if context is done, as they might be important + diffCh <- diffMessage{Error: err} + } + default: + // Context not cancelled, send the error + diffCh <- diffMessage{Error: err} + } + } + }() + + return diffCh +} + // activeActiveModTimeUpdated tries to calculate if the object copy in the target // is older than the one in the source by comparing the modtime of the data. func activeActiveModTimeUpdated(src, dst *ClientContent) bool { @@ -167,7 +465,12 @@ func bucketObjectDifference(ctx context.Context, sourceClnt, targetClnt Client) }) } -func objectDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) (diffCh chan diffMessage) { +func objectDifference(ctx context.Context, sourceClnt, targetClnt Client, opts mirrorOptions) chan diffMessage { + if opts.bfs { + // Use layer-by-layer difference for regular objects + return layerDifferenceIter(ctx, sourceClnt, targetClnt, opts) + } + sourceURL := sourceClnt.GetURL().String() sourceCh := sourceClnt.List(ctx, ListOptions{Recursive: true, WithMetadata: opts.isMetadata, ShowDir: DirNone}) diff --git a/cmd/mirror-main.go b/cmd/mirror-main.go index 358e41df5d..6f8f7492d8 100644 --- a/cmd/mirror-main.go +++ b/cmd/mirror-main.go @@ -144,6 +144,10 @@ var ( Name: "skip-errors", Usage: "skip any errors when mirroring", }, + cli.BoolFlag{ + Name: "bfs", + Usage: "using BFS for layer-by-layer traversal of files, suitable for large number of files", + }, checksumFlag, } ) @@ -212,7 +216,7 @@ EXAMPLES: {{.Prompt}} {{.HelpName}} --older-than 30d s3/test ~/test 13. Mirror server encrypted objects from Amazon S3 cloud storage to a bucket on Amazon S3 cloud storage - {{.Prompt}} {{.HelpName}} --enc-c "minio/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDA" --enc-c "s3/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5BBB" s3/archive/ minio/archive/ + {{.Prompt}} {{.HelpName}} --enc-c "minio/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDA" --enc-c "s3/archive=MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5BBB" s3/archive/ minio/archive/ 14. Update 'Cache-Control' header on all existing objects recursively. {{.Prompt}} {{.HelpName}} --attr "Cache-Control=max-age=90000,min-fresh=9000" myminio/video-files myminio/video-files @@ -1025,6 +1029,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc userMetadata: userMetadata, encKeyDB: encKeyDB, activeActive: isActiveActive, + bfs: cli.Bool("bfs"), } // If we are not using active/active and we are not removing diff --git a/cmd/mirror-url.go b/cmd/mirror-url.go index 1c71fb8917..5f465fcdde 100644 --- a/cmd/mirror-url.go +++ b/cmd/mirror-url.go @@ -278,6 +278,7 @@ type mirrorOptions struct { userMetadata map[string]string checksum minio.ChecksumType sourceListingOnly bool + bfs bool } // Prepares urls that need to be copied or removed based on requested options.