From e3d7332d21d46f527572e75349e12a6936ddb905 Mon Sep 17 00:00:00 2001 From: Nye Liu Date: Thu, 26 Jun 2025 21:57:27 -0700 Subject: [PATCH] feat(#3): Add namespace support --- README.md | 4 ++ cmd/snapshot-monitor/main.go | 2 +- deployment/config.yaml | 1 + deployment/docker/docker-compose.yml | 1 + deployment/systemd/snapshot-monitor.env | 1 + pkg/config/config.go | 4 ++ pkg/s3/s3.go | 69 ++++++++++++++++--------- 7 files changed, 58 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index c265598..44b0240 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ watch_dir: "/snapshots" # S3-compatible storage configuration s3_endpoint: "https://s3.example.com" s3_bucket: "solana-snapshots" +s3_namespace: "mainnet" # Optional: namespace prefix for all S3 remote paths s3_access_key: "your-access-key" s3_secret_key: "your-secret-key" s3_public_endpoint: "https://snapshots.example.com" # Optional public endpoint for URLs in metadata @@ -134,6 +135,7 @@ Alternatively, you can use environment variables: - `S3_ENDPOINT`: S3-compatible storage URL - `S3_PUBLIC_ENDPOINT`: Public endpoint URL for snapshot access - `S3_BUCKET`: S3 bucket name +- `S3_NAMESPACE`: Optional namespace prefix for all S3 remote paths - `S3_ACCESS_KEY`: S3 access key - `S3_SECRET_KEY`: S3 secret key - `S3_UPLOAD_CONCURRENCY`: Number of parallel upload operations (will be supported in a future version) @@ -179,6 +181,7 @@ Or with environment variables: export WATCH_DIR=/snapshots export S3_ENDPOINT=https://s3.example.com export S3_BUCKET=solana-snapshots +export S3_NAMESPACE=mainnet export S3_ACCESS_KEY=your-access-key export S3_SECRET_KEY=your-secret-key ./snapshot-monitor @@ -228,6 +231,7 @@ docker run -d \ -e WATCH_DIR=/snapshots \ -e S3_ENDPOINT=https://s3.example.com \ -e S3_BUCKET=solana-snapshots \ + -e S3_NAMESPACE=mainnet \ -e S3_ACCESS_KEY=your-access-key \ -e S3_SECRET_KEY=your-secret-key \ ghcr.io/maestroi/agave-snapshot-uploader:latest diff --git a/cmd/snapshot-monitor/main.go b/cmd/snapshot-monitor/main.go index 256e24a..73417c4 100644 --- a/cmd/snapshot-monitor/main.go +++ b/cmd/snapshot-monitor/main.go @@ -77,7 +77,7 @@ func main() { } // Create S3 client - s3Client, err := s3.NewClient(cfg.S3Endpoint, cfg.S3Bucket, cfg.S3AccessKey, cfg.S3SecretKey) + s3Client, err := s3.NewClient(cfg.S3Endpoint, cfg.S3Bucket, cfg.S3Namespace, cfg.S3AccessKey, cfg.S3SecretKey) if err != nil { logger.Error("Failed to create S3 client", "error", err) os.Exit(1) diff --git a/deployment/config.yaml b/deployment/config.yaml index 46e8cf6..2feaf6c 100644 --- a/deployment/config.yaml +++ b/deployment/config.yaml @@ -6,6 +6,7 @@ watch_dir: "/snapshots" # S3-compatible storage configuration s3_endpoint: "https://s3.example.com" s3_bucket: "solana-snapshots" +s3_namespace: "mainnet" # Optional: namespace prefix for all S3 remote paths s3_access_key: "your-access-key" s3_secret_key: "your-secret-key" diff --git a/deployment/docker/docker-compose.yml b/deployment/docker/docker-compose.yml index 404da27..52d749e 100644 --- a/deployment/docker/docker-compose.yml +++ b/deployment/docker/docker-compose.yml @@ -7,6 +7,7 @@ services: WATCH_DIR: "/snapshots" S3_ENDPOINT: "https://s3.example.com" S3_BUCKET: "solana-snapshots" + S3_NAMESPACE: "mainnet" S3_ACCESS_KEY: "your-access-key" S3_SECRET_KEY: "your-secret-key" S3_PUBLIC_ENDPOINT: "https://s3.example.com" diff --git a/deployment/systemd/snapshot-monitor.env b/deployment/systemd/snapshot-monitor.env index 44dd51f..41a602f 100644 --- a/deployment/systemd/snapshot-monitor.env +++ b/deployment/systemd/snapshot-monitor.env @@ -1,6 +1,7 @@ WATCH_DIR=/snapshots S3_ENDPOINT=https://s3.example.com S3_BUCKET=solana-snapshots +S3_NAMESPACE=mainnet S3_ACCESS_KEY=your-access-key S3_SECRET_KEY=your-secret-key SOLANA_VERSION=1.18.5 diff --git a/pkg/config/config.go b/pkg/config/config.go index f771e50..01aa40c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,6 +15,7 @@ type Config struct { S3Endpoint string `yaml:"s3_endpoint" json:"s3_endpoint"` S3PublicEndpoint string `yaml:"s3_public_endpoint" json:"s3_public_endpoint"` S3Bucket string `yaml:"s3_bucket" json:"s3_bucket"` + S3Namespace string `yaml:"s3_namespace" json:"s3_namespace"` S3AccessKey string `yaml:"s3_access_key" json:"s3_access_key"` S3SecretKey string `yaml:"s3_secret_key" json:"s3_secret_key"` S3UploadConcurrency int `yaml:"s3_upload_concurrency" json:"s3_upload_concurrency"` @@ -97,6 +98,9 @@ func LoadFromEnv() *Config { if val := os.Getenv("S3_BUCKET"); val != "" { cfg.S3Bucket = val } + if val := os.Getenv("S3_NAMESPACE"); val != "" { + cfg.S3Namespace = val + } if val := os.Getenv("S3_ACCESS_KEY"); val != "" { cfg.S3AccessKey = val } diff --git a/pkg/s3/s3.go b/pkg/s3/s3.go index 233bce8..71c26bc 100644 --- a/pkg/s3/s3.go +++ b/pkg/s3/s3.go @@ -39,9 +39,10 @@ const ( // Client is a wrapper around AWS S3 client type Client struct { - client *s3.Client - bucketName string - logger *slog.Logger + client *s3.Client + bucketName string + bucketNamespace string + logger *slog.Logger } // ProgressReader is a wrapper around an io.Reader that reports progress @@ -145,7 +146,7 @@ func (l *filteringLogger) Logf(classification logging.Classification, format str } // NewClient creates a new S3 client -func NewClient(endpoint, bucketName, accessKey, secretKey string) (*Client, error) { +func NewClient(endpoint, bucketName, bucketNamespace, accessKey, secretKey string) (*Client, error) { // Create exactly the same config as the faster implementation cfg := aws.Config{ Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""), @@ -162,9 +163,10 @@ func NewClient(endpoint, bucketName, accessKey, secretKey string) (*Client, erro client := s3.NewFromConfig(cfg) return &Client{ - client: client, - bucketName: bucketName, - logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), + client: client, + bucketName: bucketName, + bucketNamespace: bucketNamespace, + logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), }, nil } @@ -175,13 +177,20 @@ func (c *Client) SetLogger(logger *slog.Logger) { } } +// getRemotePath returns the path with namespace prefix if namespace is set +func (c *Client) getRemotePath(path string) string { + if c.bucketNamespace == "" { + return path + } + return c.bucketNamespace + "/" + path +} + // PartReader is a reader that handles part uploads with proper checksum handling type PartReader struct { reader io.Reader size int64 read int64 progressFn func(n int64) - buf []byte } func NewPartReader(reader io.Reader, size int64, progressFn func(n int64)) *PartReader { @@ -207,8 +216,10 @@ func (pr *PartReader) Size() int64 { return pr.size } -// CleanupMultipartUploads cleans up any existing multipart uploads for the given key -func (c *Client) CleanupMultipartUploads(ctx context.Context, key string) error { +// CleanupMultipartUploads cleans up any existing multipart uploads for the given path +func (c *Client) CleanupMultipartUploads(ctx context.Context, path string) error { + // Apply namespace to S3 path + key := c.getRemotePath(path) c.logger.Info("Starting cleanup of existing multipart uploads", "key", key) // List all multipart uploads @@ -260,7 +271,7 @@ func (c *Client) CleanupMultipartUploads(ctx context.Context, key string) error } // UploadFile uploads a file to S3 using the fast implementation approach -func (c *Client) UploadFile(ctx context.Context, localPath, s3Path string) error { +func (c *Client) UploadFile(ctx context.Context, localPath, path string) error { // Open file exactly as in the faster implementation file, err := os.Open(localPath) if err != nil { @@ -275,6 +286,9 @@ func (c *Client) UploadFile(ctx context.Context, localPath, s3Path string) error } fileSize := info.Size() + // Apply namespace to S3 path + s3Path := c.getRemotePath(path) + c.logger.Info("Starting upload", "file", localPath, "key", s3Path, "size", fileSize) // Create multipart upload exactly as in the faster implementation @@ -388,8 +402,10 @@ func (c *Client) UploadFileMultipart(ctx context.Context, localPath, s3Path stri return c.UploadFile(ctx, localPath, s3Path) } -// cleanupExistingUploads aborts any existing multipart uploads for the given key -func (c *Client) cleanupExistingUploads(ctx context.Context, key string) error { +// cleanupExistingUploads aborts any existing multipart uploads for the given path +func (c *Client) cleanupExistingUploads(ctx context.Context, path string) error { + // Apply namespace to S3 path + key := c.getRemotePath(path) listResp, err := c.client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{ Bucket: aws.String(c.bucketName), Prefix: aws.String(key), @@ -478,11 +494,13 @@ func formatFileSize(size int64) string { } // FileExists checks if a file exists in S3 -func (c *Client) FileExists(s3Path string) (bool, error) { +func (c *Client) FileExists(path string) (bool, error) { // Create a context with timeout for the operation ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + // Apply namespace to S3 path + s3Path := c.getRemotePath(path) startTime := time.Now() _, err := c.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(c.bucketName), @@ -513,7 +531,7 @@ func (c *Client) FileExists(s3Path string) (bool, error) { } // DownloadFile downloads a file from S3 -func (c *Client) DownloadFile(s3Path, localPath string) error { +func (c *Client) DownloadFile(path, localPath string) error { // Create the directory if it doesn't exist dir := filepath.Dir(localPath) if err := os.MkdirAll(dir, 0755); err != nil { @@ -527,6 +545,8 @@ func (c *Client) DownloadFile(s3Path, localPath string) error { } defer file.Close() + // Apply namespace to S3 path + s3Path := c.getRemotePath(path) c.logger.Info("Downloading file", "s3_path", s3Path, "local_path", localPath) // Download the file @@ -550,7 +570,9 @@ func (c *Client) DownloadFile(s3Path, localPath string) error { } // DeleteObject deletes an object from S3, ensuring metadata files for snapshots are preserved -func (c *Client) DeleteObject(s3Path string) error { +func (c *Client) DeleteObject(path string) error { + // Apply namespace to S3 path + s3Path := c.getRemotePath(path) // If we're deleting a metadata file, first check if the snapshot exists if IsMetadataKey(s3Path) { hasSnapshot, err := c.HasCorrespondingFile(context.Background(), s3Path) @@ -603,7 +625,8 @@ func (c *Client) DeleteObjects(s3Paths []string) error { continue } } - filteredPaths = append(filteredPaths, path) + // Apply namespace to S3 path + filteredPaths = append(filteredPaths, c.getRemotePath(path)) } c.logger.Info("Filtered paths for deletion", "original_count", len(s3Paths), @@ -693,7 +716,7 @@ func (c *Client) ListObjects(prefix string) ([]S3Object, error) { for { result, err := c.client.ListObjectsV2(context.Background(), &s3.ListObjectsV2Input{ Bucket: aws.String(c.bucketName), - Prefix: aws.String(prefix), + Prefix: aws.String(c.getRemotePath(prefix)), ContinuationToken: continuationToken, }) if err != nil { @@ -786,7 +809,7 @@ func GetUploadInfoFilePath(filePath string) string { func (c *Client) AbortMultipartUpload(s3Path, uploadID string) error { _, err := c.client.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ Bucket: aws.String(c.bucketName), - Key: aws.String(s3Path), + Key: aws.String(c.getRemotePath(s3Path)), UploadId: aws.String(uploadID), }) if err != nil { @@ -863,10 +886,10 @@ func (c *Client) HasCorrespondingFile(ctx context.Context, key string) (bool, er var checkKey string if IsMetadataKey(key) { // For metadata file, check if snapshot exists - checkKey = GetSnapshotKeyFromMetadata(key) + checkKey = c.getRemotePath(GetSnapshotKeyFromMetadata(key)) } else if IsSnapshotKey(key) { // For snapshot file, check if metadata exists - checkKey = GetMetadataKey(key) + checkKey = c.getRemotePath(GetMetadataKey(key)) } else { // Not a snapshot or metadata file return false, nil @@ -914,7 +937,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e return fmt.Errorf("not a metadata file: %s", metadataKey) } - c.logger.Info("Updating latest.json with metadata", "source", metadataKey) + c.logger.Info("Updating latest.json with metadata", "source", c.getRemotePath(metadataKey)) // Create a temporary file to download the metadata tmpFile, err := os.CreateTemp("", "metadata-*.json") @@ -1157,7 +1180,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e // Upload consolidated metadata as latest.json with proper content type _, err = c.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(c.bucketName), - Key: aws.String("latest.json"), + Key: aws.String(c.getRemotePath("latest.json")), Body: consolidatedFile, ContentType: aws.String("application/json"), })