Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ watch_dir: "/snapshots"
# S3-compatible storage configuration
s3_endpoint: "https://s3.example.com"
s3_bucket: "solana-snapshots"
s3_namespace: "mainnet" # Optional: namespace prefix for all S3 remote paths
s3_access_key: "your-access-key"
s3_secret_key: "your-secret-key"
s3_public_endpoint: "https://snapshots.example.com" # Optional public endpoint for URLs in metadata
Expand Down Expand Up @@ -134,6 +135,7 @@ Alternatively, you can use environment variables:
- `S3_ENDPOINT`: S3-compatible storage URL
- `S3_PUBLIC_ENDPOINT`: Public endpoint URL for snapshot access
- `S3_BUCKET`: S3 bucket name
- `S3_NAMESPACE`: Optional namespace prefix for all S3 remote paths
- `S3_ACCESS_KEY`: S3 access key
- `S3_SECRET_KEY`: S3 secret key
- `S3_UPLOAD_CONCURRENCY`: Number of parallel upload operations (will be supported in a future version)
Expand Down Expand Up @@ -179,6 +181,7 @@ Or with environment variables:
export WATCH_DIR=/snapshots
export S3_ENDPOINT=https://s3.example.com
export S3_BUCKET=solana-snapshots
export S3_NAMESPACE=mainnet
export S3_ACCESS_KEY=your-access-key
export S3_SECRET_KEY=your-secret-key
./snapshot-monitor
Expand Down Expand Up @@ -228,6 +231,7 @@ docker run -d \
-e WATCH_DIR=/snapshots \
-e S3_ENDPOINT=https://s3.example.com \
-e S3_BUCKET=solana-snapshots \
-e S3_NAMESPACE=mainnet \
-e S3_ACCESS_KEY=your-access-key \
-e S3_SECRET_KEY=your-secret-key \
ghcr.io/maestroi/agave-snapshot-uploader:latest
Expand Down
2 changes: 1 addition & 1 deletion cmd/snapshot-monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
}

// Create S3 client
s3Client, err := s3.NewClient(cfg.S3Endpoint, cfg.S3Bucket, cfg.S3AccessKey, cfg.S3SecretKey)
s3Client, err := s3.NewClient(cfg.S3Endpoint, cfg.S3Bucket, cfg.S3Namespace, cfg.S3AccessKey, cfg.S3SecretKey)
if err != nil {
logger.Error("Failed to create S3 client", "error", err)
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions deployment/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ watch_dir: "/snapshots"
# S3-compatible storage configuration
s3_endpoint: "https://s3.example.com"
s3_bucket: "solana-snapshots"
s3_namespace: "mainnet" # Optional: namespace prefix for all S3 remote paths
s3_access_key: "your-access-key"
s3_secret_key: "your-secret-key"

Expand Down
1 change: 1 addition & 0 deletions deployment/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
WATCH_DIR: "/snapshots"
S3_ENDPOINT: "https://s3.example.com"
S3_BUCKET: "solana-snapshots"
S3_NAMESPACE: "mainnet"
S3_ACCESS_KEY: "your-access-key"
S3_SECRET_KEY: "your-secret-key"
S3_PUBLIC_ENDPOINT: "https://s3.example.com"
Expand Down
1 change: 1 addition & 0 deletions deployment/systemd/snapshot-monitor.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
WATCH_DIR=/snapshots
S3_ENDPOINT=https://s3.example.com
S3_BUCKET=solana-snapshots
S3_NAMESPACE=mainnet
S3_ACCESS_KEY=your-access-key
S3_SECRET_KEY=your-secret-key
SOLANA_VERSION=1.18.5
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
S3Endpoint string `yaml:"s3_endpoint" json:"s3_endpoint"`
S3PublicEndpoint string `yaml:"s3_public_endpoint" json:"s3_public_endpoint"`
S3Bucket string `yaml:"s3_bucket" json:"s3_bucket"`
S3Namespace string `yaml:"s3_namespace" json:"s3_namespace"`
S3AccessKey string `yaml:"s3_access_key" json:"s3_access_key"`
S3SecretKey string `yaml:"s3_secret_key" json:"s3_secret_key"`
S3UploadConcurrency int `yaml:"s3_upload_concurrency" json:"s3_upload_concurrency"`
Expand Down Expand Up @@ -97,6 +98,9 @@ func LoadFromEnv() *Config {
if val := os.Getenv("S3_BUCKET"); val != "" {
cfg.S3Bucket = val
}
if val := os.Getenv("S3_NAMESPACE"); val != "" {
cfg.S3Namespace = val
}
if val := os.Getenv("S3_ACCESS_KEY"); val != "" {
cfg.S3AccessKey = val
}
Expand Down
69 changes: 46 additions & 23 deletions pkg/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ const (

// Client is a wrapper around AWS S3 client
type Client struct {
client *s3.Client
bucketName string
logger *slog.Logger
client *s3.Client
bucketName string
bucketNamespace string
logger *slog.Logger
}

// ProgressReader is a wrapper around an io.Reader that reports progress
Expand Down Expand Up @@ -145,7 +146,7 @@ func (l *filteringLogger) Logf(classification logging.Classification, format str
}

// NewClient creates a new S3 client
func NewClient(endpoint, bucketName, accessKey, secretKey string) (*Client, error) {
func NewClient(endpoint, bucketName, bucketNamespace, accessKey, secretKey string) (*Client, error) {
// Create exactly the same config as the faster implementation
cfg := aws.Config{
Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""),
Expand All @@ -162,9 +163,10 @@ func NewClient(endpoint, bucketName, accessKey, secretKey string) (*Client, erro
client := s3.NewFromConfig(cfg)

return &Client{
client: client,
bucketName: bucketName,
logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
client: client,
bucketName: bucketName,
bucketNamespace: bucketNamespace,
logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
}, nil
}

Expand All @@ -175,13 +177,20 @@ func (c *Client) SetLogger(logger *slog.Logger) {
}
}

// getRemotePath returns the path with namespace prefix if namespace is set
func (c *Client) getRemotePath(path string) string {
if c.bucketNamespace == "" {
return path
}
return c.bucketNamespace + "/" + path
}

// PartReader is a reader that handles part uploads with proper checksum handling
type PartReader struct {
reader io.Reader
size int64
read int64
progressFn func(n int64)
buf []byte
}

func NewPartReader(reader io.Reader, size int64, progressFn func(n int64)) *PartReader {
Expand All @@ -207,8 +216,10 @@ func (pr *PartReader) Size() int64 {
return pr.size
}

// CleanupMultipartUploads cleans up any existing multipart uploads for the given key
func (c *Client) CleanupMultipartUploads(ctx context.Context, key string) error {
// CleanupMultipartUploads cleans up any existing multipart uploads for the given path
func (c *Client) CleanupMultipartUploads(ctx context.Context, path string) error {
// Apply namespace to S3 path
key := c.getRemotePath(path)
c.logger.Info("Starting cleanup of existing multipart uploads", "key", key)

// List all multipart uploads
Expand Down Expand Up @@ -260,7 +271,7 @@ func (c *Client) CleanupMultipartUploads(ctx context.Context, key string) error
}

// UploadFile uploads a file to S3 using the fast implementation approach
func (c *Client) UploadFile(ctx context.Context, localPath, s3Path string) error {
func (c *Client) UploadFile(ctx context.Context, localPath, path string) error {
// Open file exactly as in the faster implementation
file, err := os.Open(localPath)
if err != nil {
Expand All @@ -275,6 +286,9 @@ func (c *Client) UploadFile(ctx context.Context, localPath, s3Path string) error
}
fileSize := info.Size()

// Apply namespace to S3 path
s3Path := c.getRemotePath(path)

c.logger.Info("Starting upload", "file", localPath, "key", s3Path, "size", fileSize)

// Create multipart upload exactly as in the faster implementation
Expand Down Expand Up @@ -388,8 +402,10 @@ func (c *Client) UploadFileMultipart(ctx context.Context, localPath, s3Path stri
return c.UploadFile(ctx, localPath, s3Path)
}

// cleanupExistingUploads aborts any existing multipart uploads for the given key
func (c *Client) cleanupExistingUploads(ctx context.Context, key string) error {
// cleanupExistingUploads aborts any existing multipart uploads for the given path
func (c *Client) cleanupExistingUploads(ctx context.Context, path string) error {
// Apply namespace to S3 path
key := c.getRemotePath(path)
listResp, err := c.client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{
Bucket: aws.String(c.bucketName),
Prefix: aws.String(key),
Expand Down Expand Up @@ -478,11 +494,13 @@ func formatFileSize(size int64) string {
}

// FileExists checks if a file exists in S3
func (c *Client) FileExists(s3Path string) (bool, error) {
func (c *Client) FileExists(path string) (bool, error) {
// Create a context with timeout for the operation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Apply namespace to S3 path
s3Path := c.getRemotePath(path)
startTime := time.Now()
_, err := c.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(c.bucketName),
Expand Down Expand Up @@ -513,7 +531,7 @@ func (c *Client) FileExists(s3Path string) (bool, error) {
}

// DownloadFile downloads a file from S3
func (c *Client) DownloadFile(s3Path, localPath string) error {
func (c *Client) DownloadFile(path, localPath string) error {
// Create the directory if it doesn't exist
dir := filepath.Dir(localPath)
if err := os.MkdirAll(dir, 0755); err != nil {
Expand All @@ -527,6 +545,8 @@ func (c *Client) DownloadFile(s3Path, localPath string) error {
}
defer file.Close()

// Apply namespace to S3 path
s3Path := c.getRemotePath(path)
c.logger.Info("Downloading file", "s3_path", s3Path, "local_path", localPath)

// Download the file
Expand All @@ -550,7 +570,9 @@ func (c *Client) DownloadFile(s3Path, localPath string) error {
}

// DeleteObject deletes an object from S3, ensuring metadata files for snapshots are preserved
func (c *Client) DeleteObject(s3Path string) error {
func (c *Client) DeleteObject(path string) error {
// Apply namespace to S3 path
s3Path := c.getRemotePath(path)
// If we're deleting a metadata file, first check if the snapshot exists
if IsMetadataKey(s3Path) {
hasSnapshot, err := c.HasCorrespondingFile(context.Background(), s3Path)
Expand Down Expand Up @@ -603,7 +625,8 @@ func (c *Client) DeleteObjects(s3Paths []string) error {
continue
}
}
filteredPaths = append(filteredPaths, path)
// Apply namespace to S3 path
filteredPaths = append(filteredPaths, c.getRemotePath(path))
}
c.logger.Info("Filtered paths for deletion",
"original_count", len(s3Paths),
Expand Down Expand Up @@ -693,7 +716,7 @@ func (c *Client) ListObjects(prefix string) ([]S3Object, error) {
for {
result, err := c.client.ListObjectsV2(context.Background(), &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucketName),
Prefix: aws.String(prefix),
Prefix: aws.String(c.getRemotePath(prefix)),
ContinuationToken: continuationToken,
})
if err != nil {
Expand Down Expand Up @@ -786,7 +809,7 @@ func GetUploadInfoFilePath(filePath string) string {
func (c *Client) AbortMultipartUpload(s3Path, uploadID string) error {
_, err := c.client.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String(c.bucketName),
Key: aws.String(s3Path),
Key: aws.String(c.getRemotePath(s3Path)),
UploadId: aws.String(uploadID),
})
if err != nil {
Expand Down Expand Up @@ -863,10 +886,10 @@ func (c *Client) HasCorrespondingFile(ctx context.Context, key string) (bool, er
var checkKey string
if IsMetadataKey(key) {
// For metadata file, check if snapshot exists
checkKey = GetSnapshotKeyFromMetadata(key)
checkKey = c.getRemotePath(GetSnapshotKeyFromMetadata(key))
} else if IsSnapshotKey(key) {
// For snapshot file, check if metadata exists
checkKey = GetMetadataKey(key)
checkKey = c.getRemotePath(GetMetadataKey(key))
} else {
// Not a snapshot or metadata file
return false, nil
Expand Down Expand Up @@ -914,7 +937,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e
return fmt.Errorf("not a metadata file: %s", metadataKey)
}

c.logger.Info("Updating latest.json with metadata", "source", metadataKey)
c.logger.Info("Updating latest.json with metadata", "source", c.getRemotePath(metadataKey))

// Create a temporary file to download the metadata
tmpFile, err := os.CreateTemp("", "metadata-*.json")
Expand Down Expand Up @@ -1157,7 +1180,7 @@ func (c *Client) UpdateLatestMetadata(ctx context.Context, metadataKey string) e
// Upload consolidated metadata as latest.json with proper content type
_, err = c.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(c.bucketName),
Key: aws.String("latest.json"),
Key: aws.String(c.getRemotePath("latest.json")),
Body: consolidatedFile,
ContentType: aws.String("application/json"),
})
Expand Down