Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions awss3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"sync"
"time"

"context"

"github.com/araddon/gou"
"github.com/pborman/uuid"
"golang.org/x/net/context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
Expand Down Expand Up @@ -446,27 +447,32 @@ func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metada
return nil, fmt.Errorf("options IfNotExists not supported for store type")
}

// Create an uploader with the session and default options
uploader := s3manager.NewUploader(f.sess)
rwc := csbufio.NewBackgroundWriteCloser(ctx,
func(ctx context.Context, rc io.ReadCloser) error {
defer rc.Close()

uploader := s3manager.NewUploader(f.sess)

// TODO: this needs to be managed, ie shutdown signals, close, handler err etc.

// Upload the file to S3.
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(f.bucket),
Key: aws.String(objectName),
Body: rc,
})
if err != nil {
err = fmt.Errorf("unable to write %q to s3 bucket %q: %w", objectName, f.bucket, err)

pr, pw := io.Pipe()
bw := csbufio.NewWriter(ctx, pw)
gou.Warnf("could not upload %v", err)

go func() {
// TODO: this needs to be managed, ie shutdown signals, close, handler err etc.
return err
}

// Upload the file to S3.
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(f.bucket),
Key: aws.String(objectName),
Body: pr,
return nil
})
if err != nil {
gou.Warnf("could not upload %v", err)
}
}()

return bw, nil
return rwc, nil
}

// Delete requested object path string.
Expand Down
81 changes: 17 additions & 64 deletions azure/store.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package azure

import (
"bufio"
"encoding/base64"
"encoding/binary"
"fmt"
Expand All @@ -11,12 +10,14 @@ import (
"strings"
"time"

"context"

az "github.com/Azure/azure-sdk-for-go/storage"
"github.com/araddon/gou"
"github.com/lytics/cloudstorage"
"github.com/pborman/uuid"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"

"github.com/lytics/cloudstorage"
"github.com/lytics/cloudstorage/csbufio"
)

const (
Expand Down Expand Up @@ -384,70 +385,22 @@ func (f *FS) NewWriterWithContext(ctx context.Context, name string, metadata map
return nil, fmt.Errorf("options IfNotExists not supported for store type")
}
name = strings.Replace(name, " ", "+", -1)
o := &object{name: name, metadata: metadata}
rwc := newAzureWriteCloser(ctx, f, o)
obj := &object{name: name, metadata: metadata}

return rwc, nil
}

// azureWriteCloser - manages data and go routines used to pipe data to azures, calling Close
// will flush data to azures and block until all inflight data has been written or
// we get an error.
type azureWriteCloser struct {
pr *io.PipeReader
pw *io.PipeWriter
wc *bufio.Writer
g *errgroup.Group
}

// azureWriteCloser is a io.WriteCloser that manages the azure connection pipe and when Close is called
// it blocks until all data is flushed to azure via a background go routine call to uploadMultiPart.
func newAzureWriteCloser(ctx context.Context, f *FS, obj *object) io.WriteCloser {
pr, pw := io.Pipe()
bw := bufio.NewWriter(pw)

g, _ := errgroup.WithContext(ctx)

g.Go(func() error {
// Upload the file to azure.
// Do a multipart upload
err := f.uploadMultiPart(obj, pr)
if err != nil {
gou.Warnf("could not upload %v", err)
return err
}
return nil
})
rwc := csbufio.NewBackgroundWriteCloser(ctx,
func(_ context.Context, rc io.ReadCloser) error {
err := f.uploadMultiPart(obj, rc)
if err != nil {
err = fmt.Errorf("unable to write %q to azure bucket %q: %w", obj.name, f.bucket, err)

return azureWriteCloser{
pr, pw, bw, g,
}
}
gou.Warnf("could not upload %v", err)

// Write writes data to our write buffer, which writes to the backing io pipe.
// If an error is encountered while writting we may not see it here, my guess is
// we wouldn't see it until someone calls close and the error is returned from the
// error group.
func (bc azureWriteCloser) Write(p []byte) (nn int, err error) {
return bc.wc.Write(p)
}
return err
}
return nil
})

// Close and block until we flush inflight data to azures
func (bc azureWriteCloser) Close() error {
//Flush buffered data to the backing pipe writer.
if err := bc.wc.Flush(); err != nil {
return err
}
//Close the pipe writer so that the pipe reader will return EOF,
// doing so will cause uploadMultiPart to complete and return.
if err := bc.pw.Close(); err != nil {
return err
}
//Use the error group's Wait method to block until uploadMultPart has completed
if err := bc.g.Wait(); err != nil {
return err
}
return nil
return rwc, nil
}

const (
Expand Down
102 changes: 102 additions & 0 deletions csbufio/background_write_closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package csbufio

import (
"bufio"
"context"
"errors"
"fmt"
"io"

"github.com/araddon/gou"
"golang.org/x/sync/errgroup"
)

var _ io.WriteCloser = (*backgroundWriteCloser)(nil)

// backgroundWriteCloser - manages data and go routines used to pipe data to the cloud, calling Close
// will flush data to the cloud and block until all inflight data has been written or
// we get an error.
type backgroundWriteCloser struct {
pipeWriter io.Closer
buffioWriter *bufio.Writer
backgroundJob *errgroup.Group
done chan struct{}
}

// NewBackgroundWriteCloser - returns an io.WriteCloser that manages the cloud connection pipe and when Close is called
// it blocks until all data is flushed to the cloud via a background go routine call to uploadMultiPart.
func NewBackgroundWriteCloser(ctx context.Context, job func(context.Context, io.ReadCloser) error) io.WriteCloser {
pipeReader, pipeWriter := io.Pipe()
buffioWriter := bufio.NewWriter(pipeWriter)

var backgroundJob errgroup.Group

backgroundJob.Go(func() error {
err := job(ctx, pipeReader)
if err != nil {
gou.Warnf("could not upload %v", err)

return err
}

return nil
})

done := make(chan struct{})

return &backgroundWriteCloser{
pipeWriter, buffioWriter, &backgroundJob, done,
}
}

var errAlreadyClosed = errors.New("writer already closed")

// Write writes data to our write buffer, which writes to the backing io pipe.
// If an error is encountered while writting we may not see it here, my guess is
// we wouldn't see it until someone calls close and the error is returned from the
// error group.
func (bc *backgroundWriteCloser) Write(p []byte) (nn int, err error) {
select {
case <-bc.done:
return 0, errAlreadyClosed
default:
}

return bc.buffioWriter.Write(p)
}

// Close and block until we flush inflight data to the cloud
func (bc *backgroundWriteCloser) Close() error {
select {
case <-bc.done:
return nil
default:
close(bc.done)
}

var errs []error

//Flush buffered data to the backing pipe writer.
if err := bc.buffioWriter.Flush(); err != nil {
errs = append(errs, fmt.Errorf("unable to flush buffer: %w", err))
}

//Close the pipe writer so that the pipe reader will return EOF,
// doing so will cause uploadMultiPart to complete and return.
if err := bc.pipeWriter.Close(); err != nil {
errs = append(errs, fmt.Errorf("unable to close pipe: %w", err))
}
//Use the error group's Wait method to block until upload has completed
if err := bc.backgroundJob.Wait(); err != nil {
errs = append(errs, fmt.Errorf("error from background job: %w", err))
}

switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return errors.Join(errs...)
}
}
103 changes: 103 additions & 0 deletions csbufio/background_write_closer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package csbufio_test

import (
"context"
"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lytics/cloudstorage/csbufio"
)

func TestBackgroundWriteCloser(t *testing.T) {
t.Parallel()

testcases := []struct {
label string
mustCancelCtx bool
closeErrMsg string
}{
{
label: "test success on write and close",
mustCancelCtx: false,
},
{
label: "test success on write but close return error",
mustCancelCtx: true,
closeErrMsg: "error from background job: context canceled",
},
}

for _, tc := range testcases {
tc := tc
t.Run(tc.label, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

if tc.mustCancelCtx {
cancel()
}

var dataSentToCloud []byte
wc := csbufio.NewBackgroundWriteCloser(ctx, func(ctx context.Context, rc io.ReadCloser) error {
defer rc.Close()

data, err := io.ReadAll(rc)
if err != nil {
return err
}

dataSentToCloud = data

return ctx.Err()
})

_, err := wc.Write([]byte("foo"))
require.NoError(t, err)

err = wc.Close()
if tc.closeErrMsg != "" {
require.EqualError(t, err, tc.closeErrMsg)

return
}

require.NoError(t, err)

assert.Equal(t, []byte("foo"), dataSentToCloud)
})
}
}

func TestBackgroundWriteCloserCheckWriteAfterClose(t *testing.T) {
t.Parallel()
ctx := context.Background()
wc := csbufio.NewBackgroundWriteCloser(ctx, func(ctx context.Context, rc io.ReadCloser) error {
defer rc.Close()

return nil
})

require.NoError(t, wc.Close())

_, err := wc.Write([]byte("foo"))
require.EqualError(t, err, "writer already closed")
}

func TestBackgroundWriteCloserCheckMultipleCloses(t *testing.T) {
t.Parallel()
ctx := context.Background()
wc := csbufio.NewBackgroundWriteCloser(ctx, func(ctx context.Context, rc io.ReadCloser) error {
defer rc.Close()

return nil
})

require.NoError(t, wc.Close())

require.NoError(t, wc.Close(), "close an already closed wc should not trigger an error")
}
2 changes: 1 addition & 1 deletion csbufio/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestReaderContextDone(t *testing.T) {
n, err := rc.Read(p)
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, 0, n)
require.Len(t, p, 0)
require.Empty(t, p)

err = rc.Close()
require.ErrorIs(t, err, context.Canceled)
Expand Down
2 changes: 1 addition & 1 deletion csbufio/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestWriterContextDone(t *testing.T) {

b, err := io.ReadAll(pr)
require.NoError(t, err, "error reading")
require.Equal(t, 0, len(b), "")
require.Empty(t, b, "")

err = wc.Close()
require.ErrorIs(t, err, context.Canceled)
Expand Down
Loading