Skip to content
Open
53 changes: 46 additions & 7 deletions io/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go-v2/service/s3"
"gocloud.dev/blob"
)

Expand Down Expand Up @@ -110,13 +111,32 @@
}

func (bfs *blobFileIO) Create(name string) (FileWriter, error) {
return bfs.NewWriter(bfs.ctx, name, true, nil)
// Configure writer options to prevent chunked encoding issues
opts := &blob.WriterOptions{
BeforeWrite: func(as func(any) bool) error {
// Try to access S3-specific upload input to disable chunked encoding
var uploadInput *s3.PutObjectInput
as(&uploadInput)
return nil

Check failure on line 120 in io/blob.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 120 in io/blob.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 120 in io/blob.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)
},
}

Check failure on line 123 in io/blob.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

File is not properly formatted (gofmt)

Check failure on line 123 in io/blob.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

File is not properly formatted (gofmt)

Check failure on line 123 in io/blob.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

File is not properly formatted (gofmt)
return bfs.NewWriter(bfs.ctx, name, true, opts)
}

func (bfs *blobFileIO) WriteFile(name string, content []byte) error {
name = bfs.preprocess(name)
// Configure writer options to prevent chunked encoding issues
opts := &blob.WriterOptions{
BeforeWrite: func(as func(any) bool) error {
// Try to access S3-specific upload input to disable chunked encoding
var uploadInput *s3.PutObjectInput
as(&uploadInput)
return nil

Check failure on line 135 in io/blob.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 135 in io/blob.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 135 in io/blob.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)
},
}

return bfs.Bucket.WriteAll(bfs.ctx, name, content, nil)
return bfs.Bucket.WriteAll(bfs.ctx, name, content, opts)
}

// NewWriter returns a Writer that writes to the blob stored at path.
Expand All @@ -138,10 +158,21 @@
if err != nil {
return nil, &fs.PathError{Op: "new writer", Path: path, Err: err}
}

return nil, &fs.PathError{Op: "new writer", Path: path, Err: fs.ErrInvalid}

Check failure on line 161 in io/blob.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 161 in io/blob.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 161 in io/blob.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)
}
}
// If no options provided, create default ones to prevent chunked encoding
if opts == nil {
opts = &blob.WriterOptions{
BeforeWrite: func(as func(any) bool) error {
// Try to access S3-specific upload input to disable chunked encoding
var uploadInput *s3.PutObjectInput
as(&uploadInput)
return nil
},
}
}

bw, err := io.Bucket.NewWriter(ctx, path, opts)
if err != nil {
return nil, err
Expand All @@ -164,7 +195,15 @@
b *blobFileIO
}

func (f *blobWriteFile) Name() string { return f.name }
func (f *blobWriteFile) Sys() interface{} { return f.b }
func (f *blobWriteFile) Close() error { return f.Writer.Close() }
func (f *blobWriteFile) Write(p []byte) (int, error) { return f.Writer.Write(p) }
func (f *blobWriteFile) Name() string { return f.name }
func (f *blobWriteFile) Sys() interface{} { return f.b }
func (f *blobWriteFile) Close() error {
return f.Writer.Close()
}

Check failure on line 202 in io/blob.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

File is not properly formatted (gofumpt)

Check failure on line 202 in io/blob.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

File is not properly formatted (gofumpt)

Check failure on line 202 in io/blob.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

File is not properly formatted (gofumpt)
func (f *blobWriteFile) Write(p []byte) (int, error) {
// Note: We cannot intercept chunked encoding here because it happens
// at the HTTP transport level, not at the data write level.
// The data we receive here is the original unencoded data.
// The chunked encoding is applied later by the AWS SDK.
return f.Writer.Write(p)
}
146 changes: 114 additions & 32 deletions io/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/http"
"net/url"
"os"
"slices"
"strconv"

"github.com/apache/iceberg-go/utils"
Expand All @@ -33,6 +32,8 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/auth/bearer"
"github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"
"gocloud.dev/blob"
"gocloud.dev/blob/s3blob"
)
Expand All @@ -47,54 +48,94 @@ const (
S3ProxyURI = "s3.proxy-uri"
S3ConnectTimeout = "s3.connect-timeout"
S3SignerUri = "s3.signer.uri"
S3SignerEndpoint = "s3.signer.endpoint"
S3SignerAuthToken = "token"
S3RemoteSigningEnabled = "s3.remote-signing-enabled"
S3ForceVirtualAddressing = "s3.force-virtual-addressing"
)

var unsupportedS3Props = []string{
S3ConnectTimeout,
S3SignerUri,
}

// ParseAWSConfig parses S3 properties and returns a configuration.
func ParseAWSConfig(ctx context.Context, props map[string]string) (*aws.Config, error) {
// If any unsupported properties are set, return an error.
for k := range props {
if slices.Contains(unsupportedS3Props, k) {
return nil, fmt.Errorf("unsupported S3 property %q", k)
}
}

opts := []func(*config.LoadOptions) error{}

if tok, ok := props["token"]; ok {
if tok, ok := props[S3SignerAuthToken]; ok {
opts = append(opts, config.WithBearerAuthTokenProvider(
&bearer.StaticTokenProvider{Token: bearer.Token{Value: tok}}))
}

if region, ok := props[S3Region]; ok {
region := ""
if r, ok := props[S3Region]; ok {
region = r
opts = append(opts, config.WithRegion(region))
} else if r, ok := props["client.region"]; ok {
region = r
opts = append(opts, config.WithRegion(region))
} else if region, ok := props["client.region"]; ok {
} else if r, ok := props["rest.signing-region"]; ok {
region = r
opts = append(opts, config.WithRegion(region))
}

accessKey, secretAccessKey := props[S3AccessKeyID], props[S3SecretAccessKey]
token := props[S3SessionToken]
if accessKey != "" || secretAccessKey != "" || token != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken])))
// Check if remote signing is configured and enabled
signerURI, hasSignerURI := props[S3SignerUri]
signerEndpoint := props[S3SignerEndpoint]
remoteSigningEnabled := true // Default to true for backward compatibility
if enabledStr, ok := props[S3RemoteSigningEnabled]; ok {
if enabled, err := strconv.ParseBool(enabledStr); err == nil {
remoteSigningEnabled = enabled
}
}

if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
if hasSignerURI && signerURI != "" && remoteSigningEnabled {
// For remote signing, we still need valid (but potentially dummy) credentials
// The actual signing will be handled by the transport layer
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
"remote-signer", "remote-signer", "")))

// Create a custom HTTP client with remote signing transport
baseTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
}

opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
func(t *http.Transport) {
t.Proxy = http.ProxyURL(proxyURL)
},
)))
// Apply proxy if configured
if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
}
baseTransport.Proxy = http.ProxyURL(proxyURL)
}

// Get auth token if configured
authToken := props[S3SignerAuthToken]
timeoutStr := props[S3ConnectTimeout]

remoteSigningTransport := NewRemoteSigningTransport(baseTransport, signerURI, signerEndpoint, region, authToken, timeoutStr)
httpClient := &http.Client{
Transport: remoteSigningTransport,
}

opts = append(opts, config.WithHTTPClient(httpClient))
} else {
// Use regular credentials if no remote signer
accessKey, secretAccessKey := props[S3AccessKeyID], props[S3SecretAccessKey]
token := props[S3SessionToken]
if accessKey != "" || secretAccessKey != "" || token != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken])))
}

if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
}

opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
func(t *http.Transport) {
t.Proxy = http.ProxyURL(proxyURL)
},
)))
}
}

awscfg := new(aws.Config)
Expand Down Expand Up @@ -133,16 +174,57 @@ func createS3Bucket(ctx context.Context, parsed *url.URL, props map[string]strin
}
}

// Check if remote signing is enabled
_, hasSignerURI := props[S3SignerUri]
remoteSigningEnabled := true // Default to true for backward compatibility
if enabledStr, ok := props[S3RemoteSigningEnabled]; ok {
if enabled, err := strconv.ParseBool(enabledStr); err == nil {
remoteSigningEnabled = enabled
}
}

client := s3.NewFromConfig(*awscfg, func(o *s3.Options) {
if endpoint != "" {
o.BaseEndpoint = aws.String(endpoint)
}
o.UsePathStyle = usePathStyle
o.DisableLogOutputChecksumValidationSkipped = true

// If remote signing is enabled, configure the client to avoid chunked encoding
if hasSignerURI && remoteSigningEnabled {
// Add middleware to prevent chunked encoding
o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error {
return stack.Build.Add(
middleware.BuildMiddlewareFunc("PreventChunkedEncoding", func(
ctx context.Context, in middleware.BuildInput, next middleware.BuildHandler,
) (middleware.BuildOutput, middleware.Metadata, error) {
// Cast to smithy HTTP request
req, ok := in.Request.(*smithyhttp.Request)
if ok {
// Force Content-Length header to prevent chunked encoding
if req.ContentLength == 0 && req.Body != nil {
// Try to read the body to determine length
// Note: This is a workaround and may not work for all cases
}

// Remove any existing Content-Encoding header
req.Header.Del("Content-Encoding")
req.Header.Del("Transfer-Encoding")
}
return next.HandleBuild(ctx, in)
}),
middleware.After,
)
})
}
})

// Create a *blob.Bucket.
bucket, err := s3blob.OpenBucketV2(ctx, client, parsed.Host, nil)
// Create a *blob.Bucket with options
bucketOpts := &s3blob.Options{
// Note: UsePathStyle is configured on the S3 client above, not here
}

bucket, err := s3blob.OpenBucketV2(ctx, client, parsed.Host, bucketOpts)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading