Skip to content

Commit 582f108

Browse files
committed
chore: add WriteMutations function for SpannerLib
Adds a WriteMutations function for SpannerLib. This function can be used to write mutations to Spanner in two ways: 1. In a transaction: The mutations are buffered in the current read/write transaction. The returned message is empty. 2. Outside a transaction: The mutations are written to Spanner directly in a new read/write transaction. The returned message contains the CommitResponse.
1 parent d68fc12 commit 582f108

File tree

10 files changed

+576
-0
lines changed

10 files changed

+576
-0
lines changed

conn.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,27 @@ func sum(affected []int64) int64 {
683683
return sum
684684
}
685685

686+
// WriteMutations is not part of the public API of the database/sql driver.
687+
// It is exported for internal reasons, and may receive breaking changes without prior notice.
688+
//
689+
// WriteMutations writes mutations using this connection. The mutations are either buffered in the current transaction,
690+
// or written directly to Spanner using a new read/write transaction if the connection does not have a transaction.
691+
//
692+
// The function returns an error if the connection currently has a read-only transaction.
693+
//
694+
// The returned CommitResponse is nil if the connection currently has a transaction, as the mutations will only be
695+
// applied to Spanner when the transaction commits.
696+
func (c *conn) WriteMutations(ctx context.Context, ms []*spanner.Mutation) (*spanner.CommitResponse, error) {
697+
if c.inTransaction() {
698+
return nil, c.BufferWrite(ms)
699+
}
700+
ts, err := c.Apply(ctx, ms)
701+
if err != nil {
702+
return nil, err
703+
}
704+
return &spanner.CommitResponse{CommitTs: ts}, nil
705+
}
706+
686707
func (c *conn) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (commitTimestamp time.Time, err error) {
687708
if c.inTransaction() {
688709
return time.Time{}, spanner.ToSpannerError(

spannerlib/api/connection.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,22 @@ func CloseConnection(ctx context.Context, poolId, connId int64) error {
4747
return conn.close(ctx)
4848
}
4949

50+
// WriteMutations writes an array of mutations to Spanner. The mutations are buffered in
51+
// the current read/write transaction if the connection currently has a read/write transaction.
52+
// The mutations are applied to the database in a new read/write transaction that is automatically
53+
// committed if the connection currently does not have a transaction.
54+
//
55+
// The function returns an error if the connection is currently in a read-only transaction.
56+
//
57+
// The mutationsBytes must be an encoded BatchWriteRequest_MutationGroup protobuf object.
58+
func WriteMutations(ctx context.Context, poolId, connId int64, mutations *spannerpb.BatchWriteRequest_MutationGroup) (*spannerpb.CommitResponse, error) {
59+
conn, err := findConnection(poolId, connId)
60+
if err != nil {
61+
return nil, err
62+
}
63+
return conn.writeMutations(ctx, mutations)
64+
}
65+
5066
// BeginTransaction starts a new transaction on the given connection.
5167
// A connection can have at most one transaction at any time. This function therefore returns an error if the
5268
// connection has an active transaction.
@@ -104,6 +120,7 @@ type Connection struct {
104120
// spannerConn is an internal interface that contains the internal functions that are used by this API.
105121
// It is implemented by the spannerdriver.conn struct.
106122
type spannerConn interface {
123+
WriteMutations(ctx context.Context, ms []*spanner.Mutation) (*spanner.CommitResponse, error)
107124
BeginReadOnlyTransaction(ctx context.Context, options *spannerdriver.ReadOnlyTransactionOptions) (driver.Tx, error)
108125
BeginReadWriteTransaction(ctx context.Context, options *spannerdriver.ReadWriteTransactionOptions) (driver.Tx, error)
109126
Commit(ctx context.Context) (*spanner.CommitResponse, error)
@@ -127,6 +144,34 @@ func (conn *Connection) close(ctx context.Context) error {
127144
return nil
128145
}
129146

147+
func (conn *Connection) writeMutations(ctx context.Context, mutation *spannerpb.BatchWriteRequest_MutationGroup) (*spannerpb.CommitResponse, error) {
148+
mutations := make([]*spanner.Mutation, 0, len(mutation.Mutations))
149+
for _, m := range mutation.Mutations {
150+
spannerMutation, err := spanner.WrapMutation(m)
151+
if err != nil {
152+
return nil, err
153+
}
154+
mutations = append(mutations, spannerMutation)
155+
}
156+
var commitResponse *spanner.CommitResponse
157+
if err := conn.backend.Raw(func(driverConn any) (err error) {
158+
sc, _ := driverConn.(spannerConn)
159+
commitResponse, err = sc.WriteMutations(ctx, mutations)
160+
return err
161+
}); err != nil {
162+
return nil, err
163+
}
164+
165+
// The commit response is nil if the connection is currently in a transaction.
166+
if commitResponse == nil {
167+
return nil, nil
168+
}
169+
response := spannerpb.CommitResponse{
170+
CommitTimestamp: timestamppb.New(commitResponse.CommitTs),
171+
}
172+
return &response, nil
173+
}
174+
130175
func (conn *Connection) BeginTransaction(ctx context.Context, txOpts *spannerpb.TransactionOptions) error {
131176
var err error
132177
if txOpts.GetReadOnly() != nil {

spannerlib/api/connection_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"cloud.google.com/go/spanner/apiv1/spannerpb"
2525
"github.com/googleapis/go-sql-spanner/testutil"
2626
"google.golang.org/grpc/codes"
27+
"google.golang.org/protobuf/types/known/structpb"
2728
)
2829

2930
func TestCreateAndCloseConnection(t *testing.T) {
@@ -143,3 +144,159 @@ func TestCloseConnectionTwice(t *testing.T) {
143144
t.Fatalf("ClosePool returned unexpected error: %v", err)
144145
}
145146
}
147+
148+
func TestWriteMutations(t *testing.T) {
149+
t.Parallel()
150+
151+
ctx := context.Background()
152+
server, teardown := setupMockServer(t)
153+
defer teardown()
154+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
155+
156+
poolId, err := CreatePool(ctx, dsn)
157+
if err != nil {
158+
t.Fatalf("CreatePool returned unexpected error: %v", err)
159+
}
160+
connId, err := CreateConnection(ctx, poolId)
161+
if err != nil {
162+
t.Fatalf("CreateConnection returned unexpected error: %v", err)
163+
}
164+
165+
mutations := &spannerpb.BatchWriteRequest_MutationGroup{Mutations: []*spannerpb.Mutation{
166+
{Operation: &spannerpb.Mutation_Insert{Insert: &spannerpb.Mutation_Write{
167+
Table: "my_table",
168+
Columns: []string{"id", "value"},
169+
Values: []*structpb.ListValue{
170+
{Values: []*structpb.Value{structpb.NewStringValue("1"), structpb.NewStringValue("One")}},
171+
{Values: []*structpb.Value{structpb.NewStringValue("2"), structpb.NewStringValue("Two")}},
172+
{Values: []*structpb.Value{structpb.NewStringValue("3"), structpb.NewStringValue("Three")}},
173+
},
174+
}}},
175+
{Operation: &spannerpb.Mutation_Update{Update: &spannerpb.Mutation_Write{
176+
Table: "my_table",
177+
Columns: []string{"id", "value"},
178+
Values: []*structpb.ListValue{
179+
{Values: []*structpb.Value{structpb.NewStringValue("0"), structpb.NewStringValue("Zero")}},
180+
},
181+
}}},
182+
}}
183+
resp, err := WriteMutations(ctx, poolId, connId, mutations)
184+
if err != nil {
185+
t.Fatalf("WriteMutations returned unexpected error: %v", err)
186+
}
187+
if resp.CommitTimestamp == nil {
188+
t.Fatalf("CommitTimestamp is nil")
189+
}
190+
requests := server.TestSpanner.DrainRequestsFromServer()
191+
beginRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.BeginTransactionRequest{}))
192+
if g, w := len(beginRequests), 1; g != w {
193+
t.Fatalf("num BeginTransaction requests mismatch\n Got: %d\nWant: %d", g, w)
194+
}
195+
commitRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CommitRequest{}))
196+
if g, w := len(commitRequests), 1; g != w {
197+
t.Fatalf("num CommitRequests mismatch\n Got: %d\nWant: %d", g, w)
198+
}
199+
commitRequest := commitRequests[0].(*spannerpb.CommitRequest)
200+
if g, w := len(commitRequest.Mutations), 2; g != w {
201+
t.Fatalf("num mutations mismatch\n Got: %d\nWant: %d", g, w)
202+
}
203+
204+
// Write the same mutations in a transaction.
205+
if err := BeginTransaction(ctx, poolId, connId, &spannerpb.TransactionOptions{}); err != nil {
206+
t.Fatalf("BeginTransaction returned unexpected error: %v", err)
207+
}
208+
resp, err = WriteMutations(ctx, poolId, connId, mutations)
209+
if err != nil {
210+
t.Fatalf("WriteMutations returned unexpected error: %v", err)
211+
}
212+
if resp != nil {
213+
t.Fatalf("WriteMutations returned unexpected response: %v", resp)
214+
}
215+
resp, err = Commit(ctx, poolId, connId)
216+
if err != nil {
217+
t.Fatalf("Commit returned unexpected error: %v", err)
218+
}
219+
if resp == nil {
220+
t.Fatalf("Commit returned nil response")
221+
}
222+
if resp.CommitTimestamp == nil {
223+
t.Fatalf("CommitTimestamp is nil")
224+
}
225+
requests = server.TestSpanner.DrainRequestsFromServer()
226+
beginRequests = testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.BeginTransactionRequest{}))
227+
if g, w := len(beginRequests), 1; g != w {
228+
t.Fatalf("num BeginTransaction requests mismatch\n Got: %d\nWant: %d", g, w)
229+
}
230+
commitRequests = testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CommitRequest{}))
231+
if g, w := len(commitRequests), 1; g != w {
232+
t.Fatalf("num CommitRequests mismatch\n Got: %d\nWant: %d", g, w)
233+
}
234+
commitRequest = commitRequests[0].(*spannerpb.CommitRequest)
235+
if g, w := len(commitRequest.Mutations), 2; g != w {
236+
t.Fatalf("num mutations mismatch\n Got: %d\nWant: %d", g, w)
237+
}
238+
239+
if err := ClosePool(ctx, poolId); err != nil {
240+
t.Fatalf("ClosePool returned unexpected error: %v", err)
241+
}
242+
}
243+
244+
func TestWriteMutationsInReadOnlyTx(t *testing.T) {
245+
t.Parallel()
246+
247+
ctx := context.Background()
248+
server, teardown := setupMockServer(t)
249+
defer teardown()
250+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
251+
252+
poolId, err := CreatePool(ctx, dsn)
253+
if err != nil {
254+
t.Fatalf("CreatePool returned unexpected error: %v", err)
255+
}
256+
connId, err := CreateConnection(ctx, poolId)
257+
if err != nil {
258+
t.Fatalf("CreateConnection returned unexpected error: %v", err)
259+
}
260+
261+
// Start a read-only transaction and try to write mutations to that transaction. That should return an error.
262+
if err := BeginTransaction(ctx, poolId, connId, &spannerpb.TransactionOptions{
263+
Mode: &spannerpb.TransactionOptions_ReadOnly_{ReadOnly: &spannerpb.TransactionOptions_ReadOnly{}},
264+
}); err != nil {
265+
t.Fatalf("BeginTransaction returned unexpected error: %v", err)
266+
}
267+
268+
mutations := &spannerpb.BatchWriteRequest_MutationGroup{Mutations: []*spannerpb.Mutation{
269+
{Operation: &spannerpb.Mutation_Insert{Insert: &spannerpb.Mutation_Write{
270+
Table: "my_table",
271+
Columns: []string{"id", "value"},
272+
Values: []*structpb.ListValue{
273+
{Values: []*structpb.Value{structpb.NewStringValue("1"), structpb.NewStringValue("One")}},
274+
},
275+
}}},
276+
}}
277+
_, err = WriteMutations(ctx, poolId, connId, mutations)
278+
if g, w := spanner.ErrCode(err), codes.FailedPrecondition; g != w {
279+
t.Fatalf("WriteMutations error code mismatch\n Got: %d\nWant: %d", g, w)
280+
}
281+
282+
// Committing the read-only transaction should not lead to any commits on Spanner.
283+
_, err = Commit(ctx, poolId, connId)
284+
if err != nil {
285+
t.Fatalf("Commit returned unexpected error: %v", err)
286+
}
287+
requests := server.TestSpanner.DrainRequestsFromServer()
288+
// There should also not be any BeginTransaction requests on Spanner, as the transaction was never really started
289+
// by a query or other statement.
290+
beginRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.BeginTransactionRequest{}))
291+
if g, w := len(beginRequests), 0; g != w {
292+
t.Fatalf("num BeginTransaction requests mismatch\n Got: %d\nWant: %d", g, w)
293+
}
294+
commitRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CommitRequest{}))
295+
if g, w := len(commitRequests), 0; g != w {
296+
t.Fatalf("num CommitRequests mismatch\n Got: %d\nWant: %d", g, w)
297+
}
298+
299+
if err := ClosePool(ctx, poolId); err != nil {
300+
t.Fatalf("ClosePool returned unexpected error: %v", err)
301+
}
302+
}

spannerlib/lib/connection.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,30 @@ func CloseConnection(ctx context.Context, poolId, connId int64) *Message {
3434
return &Message{}
3535
}
3636

37+
// WriteMutations writes an array of mutations to Spanner. The mutations are buffered in
38+
// the current read/write transaction if the connection currently has a read/write transaction.
39+
// The mutations are applied to the database in a new read/write transaction that is automatically
40+
// committed if the connection currently does not have a transaction.
41+
//
42+
// The function returns an error if the connection is currently in a read-only transaction.
43+
//
44+
// The mutationsBytes must be an encoded BatchWriteRequest_MutationGroup protobuf object.
45+
func WriteMutations(ctx context.Context, poolId, connId int64, mutationBytes []byte) *Message {
46+
mutations := spannerpb.BatchWriteRequest_MutationGroup{}
47+
if err := proto.Unmarshal(mutationBytes, &mutations); err != nil {
48+
return errMessage(err)
49+
}
50+
response, err := api.WriteMutations(ctx, poolId, connId, &mutations)
51+
if err != nil {
52+
return errMessage(err)
53+
}
54+
res, err := proto.Marshal(response)
55+
if err != nil {
56+
return errMessage(err)
57+
}
58+
return &Message{Res: res}
59+
}
60+
3761
// BeginTransaction starts a new transaction on the given connection. A connection can have at most one active
3862
// transaction at any time. This function therefore returns an error if the connection has an active transaction.
3963
func BeginTransaction(ctx context.Context, poolId, connId int64, txOptsBytes []byte) *Message {

spannerlib/lib/connection_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/googleapis/go-sql-spanner/testutil"
2424
"google.golang.org/grpc/codes"
2525
"google.golang.org/protobuf/proto"
26+
"google.golang.org/protobuf/types/known/structpb"
2627
)
2728

2829
func TestCreateAndCloseConnection(t *testing.T) {
@@ -262,3 +263,62 @@ func TestBeginAndRollback(t *testing.T) {
262263
t.Fatalf("ClosePool result mismatch\n Got: %v\nWant: %v", g, w)
263264
}
264265
}
266+
267+
func TestWriteMutations(t *testing.T) {
268+
t.Parallel()
269+
270+
ctx := context.Background()
271+
server, teardown := setupMockServer(t)
272+
defer teardown()
273+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
274+
275+
poolMsg := CreatePool(ctx, dsn)
276+
if g, w := poolMsg.Code, int32(0); g != w {
277+
t.Fatalf("CreatePool result mismatch\n Got: %v\nWant: %v", g, w)
278+
}
279+
connMsg := CreateConnection(ctx, poolMsg.ObjectId)
280+
if g, w := connMsg.Code, int32(0); g != w {
281+
t.Fatalf("CreateConnection result mismatch\n Got: %v\nWant: %v", g, w)
282+
}
283+
mutations := &spannerpb.BatchWriteRequest_MutationGroup{Mutations: []*spannerpb.Mutation{
284+
{Operation: &spannerpb.Mutation_Insert{Insert: &spannerpb.Mutation_Write{
285+
Table: "my_table",
286+
Columns: []string{"id", "value"},
287+
Values: []*structpb.ListValue{
288+
{Values: []*structpb.Value{structpb.NewStringValue("1"), structpb.NewStringValue("One")}},
289+
{Values: []*structpb.Value{structpb.NewStringValue("2"), structpb.NewStringValue("Two")}},
290+
{Values: []*structpb.Value{structpb.NewStringValue("3"), structpb.NewStringValue("Three")}},
291+
},
292+
}}},
293+
}}
294+
mutationBytes, err := proto.Marshal(mutations)
295+
if err != nil {
296+
t.Fatal(err)
297+
}
298+
mutationsMsg := WriteMutations(ctx, poolMsg.ObjectId, connMsg.ObjectId, mutationBytes)
299+
if g, w := mutationsMsg.Code, int32(0); g != w {
300+
t.Fatalf("WriteMutations result mismatch\n Got: %v\nWant: %v", g, w)
301+
}
302+
if mutationsMsg.Length() == 0 {
303+
t.Fatal("WriteMutations returned no data")
304+
}
305+
306+
// Write mutations in a transaction.
307+
mutationsMsg = BeginTransaction(ctx, poolMsg.ObjectId, connMsg.ObjectId, mutationBytes)
308+
// The response should now be an empty message, as the mutations were only buffered in the transaction.
309+
if g, w := mutationsMsg.Code, int32(0); g != w {
310+
t.Fatalf("WriteMutations result mismatch\n Got: %v\nWant: %v", g, w)
311+
}
312+
if g, w := mutationsMsg.Length(), int32(0); g != w {
313+
t.Fatalf("WriteMutations data length mismatch\n Got: %v\nWant: %v", g, w)
314+
}
315+
316+
closeMsg := CloseConnection(ctx, poolMsg.ObjectId, connMsg.ObjectId)
317+
if g, w := closeMsg.Code, int32(0); g != w {
318+
t.Fatalf("CloseConnection result mismatch\n Got: %v\nWant: %v", g, w)
319+
}
320+
closeMsg = ClosePool(ctx, poolMsg.ObjectId)
321+
if g, w := closeMsg.Code, int32(0); g != w {
322+
t.Fatalf("ClosePool result mismatch\n Got: %v\nWant: %v", g, w)
323+
}
324+
}

spannerlib/shared/shared_lib.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,22 @@ func CloseConnection(poolId, connId int64) (int64, int32, int64, int32, unsafe.P
111111
return pin(msg)
112112
}
113113

114+
// WriteMutations writes an array of mutations to Spanner. The mutations are buffered in
115+
// the current read/write transaction if the connection currently has a read/write transaction.
116+
// The mutations are applied to the database in a new read/write transaction that is automatically
117+
// committed if the connection currently does not have a transaction.
118+
//
119+
// The function returns an error if the connection is currently in a read-only transaction.
120+
//
121+
// The mutationsBytes must be an encoded BatchWriteRequest_MutationGroup protobuf object.
122+
//
123+
//export WriteMutations
124+
func WriteMutations(poolId, connectionId int64, mutationsBytes []byte) (int64, int32, int64, int32, unsafe.Pointer) {
125+
ctx := context.Background()
126+
msg := lib.WriteMutations(ctx, poolId, connectionId, mutationsBytes)
127+
return pin(msg)
128+
}
129+
114130
// Execute executes a SQL statement on the given connection.
115131
// The return type is an identifier for a Rows object. This identifier can be used to
116132
// call the functions Metadata and Next to get respectively the metadata of the result

0 commit comments

Comments
 (0)