@@ -39,9 +39,10 @@ const (
39
39
40
40
// Client is a wrapper around AWS S3 client
41
41
type Client struct {
42
- client * s3.Client
43
- bucketName string
44
- logger * slog.Logger
42
+ client * s3.Client
43
+ bucketName string
44
+ bucketNamespace string
45
+ logger * slog.Logger
45
46
}
46
47
47
48
// ProgressReader is a wrapper around an io.Reader that reports progress
@@ -145,7 +146,7 @@ func (l *filteringLogger) Logf(classification logging.Classification, format str
145
146
}
146
147
147
148
// NewClient creates a new S3 client
148
- func NewClient (endpoint , bucketName , accessKey , secretKey string ) (* Client , error ) {
149
+ func NewClient (endpoint , bucketName , bucketNamespace , accessKey , secretKey string ) (* Client , error ) {
149
150
// Create exactly the same config as the faster implementation
150
151
cfg := aws.Config {
151
152
Credentials : credentials .NewStaticCredentialsProvider (accessKey , secretKey , "" ),
@@ -162,9 +163,10 @@ func NewClient(endpoint, bucketName, accessKey, secretKey string) (*Client, erro
162
163
client := s3 .NewFromConfig (cfg )
163
164
164
165
return & Client {
165
- client : client ,
166
- bucketName : bucketName ,
167
- logger : slog .New (slog .NewTextHandler (os .Stdout , nil )),
166
+ client : client ,
167
+ bucketName : bucketName ,
168
+ bucketNamespace : bucketNamespace ,
169
+ logger : slog .New (slog .NewTextHandler (os .Stdout , nil )),
168
170
}, nil
169
171
}
170
172
@@ -175,13 +177,20 @@ func (c *Client) SetLogger(logger *slog.Logger) {
175
177
}
176
178
}
177
179
180
+ // getRemotePath returns the path with namespace prefix if namespace is set
181
+ func (c * Client ) getRemotePath (path string ) string {
182
+ if c .bucketNamespace == "" {
183
+ return path
184
+ }
185
+ return c .bucketNamespace + "/" + path
186
+ }
187
+
178
188
// PartReader is a reader that handles part uploads with proper checksum handling
179
189
type PartReader struct {
180
190
reader io.Reader
181
191
size int64
182
192
read int64
183
193
progressFn func (n int64 )
184
- buf []byte
185
194
}
186
195
187
196
func NewPartReader (reader io.Reader , size int64 , progressFn func (n int64 )) * PartReader {
@@ -207,8 +216,10 @@ func (pr *PartReader) Size() int64 {
207
216
return pr .size
208
217
}
209
218
210
- // CleanupMultipartUploads cleans up any existing multipart uploads for the given key
211
- func (c * Client ) CleanupMultipartUploads (ctx context.Context , key string ) error {
219
+ // CleanupMultipartUploads cleans up any existing multipart uploads for the given path
220
+ func (c * Client ) CleanupMultipartUploads (ctx context.Context , path string ) error {
221
+ // Apply namespace to S3 path
222
+ key := c .getRemotePath (path )
212
223
c .logger .Info ("Starting cleanup of existing multipart uploads" , "key" , key )
213
224
214
225
// List all multipart uploads
@@ -260,7 +271,7 @@ func (c *Client) CleanupMultipartUploads(ctx context.Context, key string) error
260
271
}
261
272
262
273
// UploadFile uploads a file to S3 using the fast implementation approach
263
- func (c * Client ) UploadFile (ctx context.Context , localPath , s3Path string ) error {
274
+ func (c * Client ) UploadFile (ctx context.Context , localPath , path string ) error {
264
275
// Open file exactly as in the faster implementation
265
276
file , err := os .Open (localPath )
266
277
if err != nil {
@@ -275,6 +286,9 @@ func (c *Client) UploadFile(ctx context.Context, localPath, s3Path string) error
275
286
}
276
287
fileSize := info .Size ()
277
288
289
+ // Apply namespace to S3 path
290
+ s3Path := c .getRemotePath (path )
291
+
278
292
c .logger .Info ("Starting upload" , "file" , localPath , "key" , s3Path , "size" , fileSize )
279
293
280
294
// Create multipart upload exactly as in the faster implementation
@@ -388,8 +402,10 @@ func (c *Client) UploadFileMultipart(ctx context.Context, localPath, s3Path stri
388
402
return c .UploadFile (ctx , localPath , s3Path )
389
403
}
390
404
391
- // cleanupExistingUploads aborts any existing multipart uploads for the given key
392
- func (c * Client ) cleanupExistingUploads (ctx context.Context , key string ) error {
405
+ // cleanupExistingUploads aborts any existing multipart uploads for the given path
406
+ func (c * Client ) cleanupExistingUploads (ctx context.Context , path string ) error {
407
+ // Apply namespace to S3 path
408
+ key := c .getRemotePath (path )
393
409
listResp , err := c .client .ListMultipartUploads (ctx , & s3.ListMultipartUploadsInput {
394
410
Bucket : aws .String (c .bucketName ),
395
411
Prefix : aws .String (key ),
@@ -478,11 +494,13 @@ func formatFileSize(size int64) string {
478
494
}
479
495
480
496
// FileExists checks if a file exists in S3
481
- func (c * Client ) FileExists (s3Path string ) (bool , error ) {
497
+ func (c * Client ) FileExists (path string ) (bool , error ) {
482
498
// Create a context with timeout for the operation
483
499
ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
484
500
defer cancel ()
485
501
502
+ // Apply namespace to S3 path
503
+ s3Path := c .getRemotePath (path )
486
504
startTime := time .Now ()
487
505
_ , err := c .client .HeadObject (ctx , & s3.HeadObjectInput {
488
506
Bucket : aws .String (c .bucketName ),
@@ -513,7 +531,7 @@ func (c *Client) FileExists(s3Path string) (bool, error) {
513
531
}
514
532
515
533
// DownloadFile downloads a file from S3
516
- func (c * Client ) DownloadFile (s3Path , localPath string ) error {
534
+ func (c * Client ) DownloadFile (path , localPath string ) error {
517
535
// Create the directory if it doesn't exist
518
536
dir := filepath .Dir (localPath )
519
537
if err := os .MkdirAll (dir , 0755 ); err != nil {
@@ -527,6 +545,8 @@ func (c *Client) DownloadFile(s3Path, localPath string) error {
527
545
}
528
546
defer file .Close ()
529
547
548
+ // Apply namespace to S3 path
549
+ s3Path := c .getRemotePath (path )
530
550
c .logger .Info ("Downloading file" , "s3_path" , s3Path , "local_path" , localPath )
531
551
532
552
// Download the file
@@ -550,7 +570,9 @@ func (c *Client) DownloadFile(s3Path, localPath string) error {
550
570
}
551
571
552
572
// DeleteObject deletes an object from S3, ensuring metadata files for snapshots are preserved
553
- func (c * Client ) DeleteObject (s3Path string ) error {
573
+ func (c * Client ) DeleteObject (path string ) error {
574
+ // Apply namespace to S3 path
575
+ s3Path := c .getRemotePath (path )
554
576
// If we're deleting a metadata file, first check if the snapshot exists
555
577
if IsMetadataKey (s3Path ) {
556
578
hasSnapshot , err := c .HasCorrespondingFile (context .Background (), s3Path )
@@ -603,7 +625,8 @@ func (c *Client) DeleteObjects(s3Paths []string) error {
603
625
continue
604
626
}
605
627
}
606
- filteredPaths = append (filteredPaths , path )
628
+ // Apply namespace to S3 path
629
+ filteredPaths = append (filteredPaths , c .getRemotePath (path ))
607
630
}
608
631
c .logger .Info ("Filtered paths for deletion" ,
609
632
"original_count" , len (s3Paths ),
@@ -693,7 +716,7 @@ func (c *Client) ListObjects(prefix string) ([]S3Object, error) {
693
716
for {
694
717
result , err := c .client .ListObjectsV2 (context .Background (), & s3.ListObjectsV2Input {
695
718
Bucket : aws .String (c .bucketName ),
696
- Prefix : aws .String (prefix ),
719
+ Prefix : aws .String (c . getRemotePath ( prefix ) ),
697
720
ContinuationToken : continuationToken ,
698
721
})
699
722
if err != nil {
@@ -786,7 +809,7 @@ func GetUploadInfoFilePath(filePath string) string {
786
809
func (c * Client ) AbortMultipartUpload (s3Path , uploadID string ) error {
787
810
_ , err := c .client .AbortMultipartUpload (context .Background (), & s3.AbortMultipartUploadInput {
788
811
Bucket : aws .String (c .bucketName ),
789
- Key : aws .String (s3Path ),
812
+ Key : aws .String (c . getRemotePath ( s3Path ) ),
790
813
UploadId : aws .String (uploadID ),
791
814
})
792
815
if err != nil {
@@ -863,10 +886,10 @@ func (c *Client) HasCorrespondingFile(ctx context.Context, key string) (bool, er
863
886
var checkKey string
864
887
if IsMetadataKey (key ) {
865
888
// For metadata file, check if snapshot exists
866
- checkKey = GetSnapshotKeyFromMetadata (key )
889
+ checkKey = c . getRemotePath ( GetSnapshotKeyFromMetadata (key ) )
867
890
} else if IsSnapshotKey (key ) {
868
891
// For snapshot file, check if metadata exists
869
- checkKey = GetMetadataKey (key )
892
+ checkKey = c . getRemotePath ( GetMetadataKey (key ) )
870
893
} else {
871
894
// Not a snapshot or metadata file
872
895
return false , nil
@@ -914,7 +937,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e
914
937
return fmt .Errorf ("not a metadata file: %s" , metadataKey )
915
938
}
916
939
917
- c .logger .Info ("Updating latest.json with metadata" , "source" , metadataKey )
940
+ c .logger .Info ("Updating latest.json with metadata" , "source" , c . getRemotePath ( metadataKey ) )
918
941
919
942
// Create a temporary file to download the metadata
920
943
tmpFile , err := os .CreateTemp ("" , "metadata-*.json" )
@@ -1157,7 +1180,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e
1157
1180
// Upload consolidated metadata as latest.json with proper content type
1158
1181
_ , err = c .client .PutObject (ctx , & s3.PutObjectInput {
1159
1182
Bucket : aws .String (c .bucketName ),
1160
- Key : aws .String ("latest.json" ),
1183
+ Key : aws .String (c . getRemotePath ( "latest.json" ) ),
1161
1184
Body : consolidatedFile ,
1162
1185
ContentType : aws .String ("application/json" ),
1163
1186
})
0 commit comments