Skip to content

Commit bbdea73

Browse files
committed
chore: add CreatePool and CreateConnection functions for SpannerLib
Creates a spannerlib module and adds functions for CreatePool and CreateConnection.
1 parent 203fbb9 commit bbdea73

File tree

17 files changed

+2746
-0
lines changed

17 files changed

+2746
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
on:
2+
push:
3+
branches: [ main ]
4+
pull_request:
5+
permissions:
6+
contents: read
7+
pull-requests: write
8+
name: Spanner Lib Tests
9+
jobs:
10+
test:
11+
strategy:
12+
matrix:
13+
go-version: [1.25.x]
14+
os: [ubuntu-latest, macos-latest, windows-latest]
15+
runs-on: ${{ matrix.os }}
16+
steps:
17+
- name: Install Go
18+
uses: actions/setup-go@v5
19+
with:
20+
go-version: ${{ matrix.go-version }}
21+
- name: Checkout code
22+
uses: actions/checkout@v4
23+
- name: Run unit tests
24+
working-directory: spannerlib
25+
run: go test ./... -race -short

spannerlib/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
spannerlib.h
2+
spannerlib.so
3+
grpc_server

spannerlib/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Shared Library (Internal)
2+
3+
__This module can receive breaking changes without prior notice.__
4+
5+
This is an internal module that is used to expose the features in the database/sql driver
6+
to drivers in other programming languages.

spannerlib/api/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Shared Library (Internal API)
2+
3+
__This package can receive breaking changes without prior notice.__
4+
5+
This is the common internal API for the various external APIs for exposing the
6+
features in the database/sql driver to drivers in other programming languages.

spannerlib/api/connection.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package api
16+
17+
import (
18+
"context"
19+
"database/sql"
20+
"sync"
21+
"sync/atomic"
22+
)
23+
24+
// CloseConnection looks up the connection with the given poolId and connId and closes it.
25+
func CloseConnection(ctx context.Context, poolId, connId int64) error {
26+
pool, err := findPool(poolId)
27+
if err != nil {
28+
return err
29+
}
30+
c, ok := pool.connections.LoadAndDelete(connId)
31+
if !ok {
32+
// Closing an unknown connection or a connection that has previously been closed is a no-op.
33+
return nil
34+
}
35+
conn := c.(*Connection)
36+
return conn.close(ctx)
37+
}
38+
39+
type Connection struct {
40+
// results contains the open query results for this connection.
41+
results *sync.Map
42+
resultsIdx atomic.Int64
43+
44+
// backend is the database/sql connection of this connection.
45+
backend *sql.Conn
46+
}
47+
48+
func (conn *Connection) close(ctx context.Context) error {
49+
conn.closeResults(ctx)
50+
err := conn.backend.Close()
51+
if err != nil {
52+
return err
53+
}
54+
return nil
55+
}
56+
57+
func (conn *Connection) closeResults(ctx context.Context) {
58+
conn.results.Range(func(key, value interface{}) bool {
59+
// TODO: Implement
60+
return true
61+
})
62+
}

spannerlib/api/connection_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package api
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"reflect"
21+
"testing"
22+
23+
"cloud.google.com/go/spanner"
24+
"cloud.google.com/go/spanner/apiv1/spannerpb"
25+
"github.com/googleapis/go-sql-spanner/testutil"
26+
"google.golang.org/grpc/codes"
27+
)
28+
29+
func TestCreateAndCloseConnection(t *testing.T) {
30+
t.Parallel()
31+
32+
ctx := context.Background()
33+
server, teardown := setupMockServer(t)
34+
defer teardown()
35+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
36+
37+
poolId, err := CreatePool(ctx, dsn)
38+
if err != nil {
39+
t.Fatalf("CreatePool returned unexpected error: %v", err)
40+
}
41+
connId, err := CreateConnection(ctx, poolId)
42+
if err != nil {
43+
t.Fatalf("CreateConnection returned unexpected error: %v", err)
44+
}
45+
if connId == 0 {
46+
t.Fatal("CreateConnection returned unexpected zero id")
47+
}
48+
p, ok := pools.Load(poolId)
49+
if !ok {
50+
t.Fatal("pool not found in map")
51+
}
52+
pool := p.(*Pool)
53+
if _, ok := pool.connections.Load(connId); !ok {
54+
t.Fatal("connection not in map")
55+
}
56+
57+
requests := server.TestSpanner.DrainRequestsFromServer()
58+
createSessionRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CreateSessionRequest{}))
59+
if g, w := len(createSessionRequests), 1; g != w {
60+
t.Fatalf("num CreateSession requests mismatch\n Got: %d\nWant: %d", g, w)
61+
}
62+
63+
if err := CloseConnection(ctx, poolId, connId); err != nil {
64+
t.Fatalf("CloseConnection returned unexpected error: %v", err)
65+
}
66+
if _, ok := pool.connections.Load(connId); ok {
67+
t.Fatal("connection still in map")
68+
}
69+
70+
if err := ClosePool(ctx, poolId); err != nil {
71+
t.Fatalf("ClosePool returned unexpected error: %v", err)
72+
}
73+
}
74+
75+
func TestCreateConnectionWithUnknownPool(t *testing.T) {
76+
t.Parallel()
77+
78+
ctx := context.Background()
79+
// Try to create a connection for an unknown pool.
80+
_, err := CreateConnection(ctx, -1)
81+
if g, w := spanner.ErrCode(err), codes.NotFound; g != w {
82+
t.Fatalf("error code mismatch\n Got: %d\nWant: %d", g, w)
83+
}
84+
}
85+
86+
func TestCreateTwoConnections(t *testing.T) {
87+
t.Parallel()
88+
89+
ctx := context.Background()
90+
server, teardown := setupMockServer(t)
91+
defer teardown()
92+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
93+
94+
poolId, err := CreatePool(ctx, dsn)
95+
if err != nil {
96+
t.Fatalf("CreatePool returned unexpected error: %v", err)
97+
}
98+
99+
for range 2 {
100+
connId, err := CreateConnection(ctx, poolId)
101+
if err != nil {
102+
t.Fatalf("CreateConnection returned unexpected error: %v", err)
103+
}
104+
//goland:noinspection GoDeferInLoop
105+
defer func() { _ = CloseConnection(ctx, poolId, connId) }()
106+
}
107+
108+
// Two connections in one pool should only create one multiplexed session.
109+
requests := server.TestSpanner.DrainRequestsFromServer()
110+
createSessionRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&spannerpb.CreateSessionRequest{}))
111+
if g, w := len(createSessionRequests), 1; g != w {
112+
t.Fatalf("num CreateSession requests mismatch\n Got: %d\nWant: %d", g, w)
113+
}
114+
115+
if err := ClosePool(ctx, poolId); err != nil {
116+
t.Fatalf("ClosePool returned unexpected error: %v", err)
117+
}
118+
}
119+
120+
func TestCloseConnectionTwice(t *testing.T) {
121+
t.Parallel()
122+
123+
ctx := context.Background()
124+
server, teardown := setupMockServer(t)
125+
defer teardown()
126+
dsn := fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true", server.Address)
127+
128+
poolId, err := CreatePool(ctx, dsn)
129+
if err != nil {
130+
t.Fatalf("CreatePool returned unexpected error: %v", err)
131+
}
132+
connId, err := CreateConnection(ctx, poolId)
133+
if err != nil {
134+
t.Fatalf("CreateConnection returned unexpected error: %v", err)
135+
}
136+
137+
for range 2 {
138+
if err := CloseConnection(ctx, poolId, connId); err != nil {
139+
t.Fatalf("CloseConnection returned unexpected error: %v", err)
140+
}
141+
}
142+
if err := ClosePool(ctx, poolId); err != nil {
143+
t.Fatalf("ClosePool returned unexpected error: %v", err)
144+
}
145+
}

spannerlib/api/pool.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package api
16+
17+
import (
18+
"context"
19+
"database/sql"
20+
"sync"
21+
"sync/atomic"
22+
23+
spannerdriver "github.com/googleapis/go-sql-spanner"
24+
"google.golang.org/grpc/codes"
25+
"google.golang.org/grpc/status"
26+
)
27+
28+
// pools contains all open pools in this runtime.
29+
var pools = sync.Map{}
30+
var poolsIdx = atomic.Int64{}
31+
32+
// Pool is the equivalent of a sql.DB. It contains a pool of connections to the same database.
33+
// All connections in a pool share the same Spanner client.
34+
type Pool struct {
35+
db *sql.DB
36+
connections *sync.Map
37+
connectionsIdx atomic.Int64
38+
}
39+
40+
// CreatePool creates a new Pool and stores it in the global map of pool.
41+
// The connectionString must be in the form
42+
// [host:port]/project/<project>/instances/<instance>/databases/<database>[?option1=value1[;option2=value2...]]
43+
//
44+
// Creating a pool is a relatively expensive operation, as each pool has its own Spanner client.
45+
func CreatePool(ctx context.Context, connectionString string) (int64, error) {
46+
config, err := spannerdriver.ExtractConnectorConfig(connectionString)
47+
if err != nil {
48+
return 0, err
49+
}
50+
connector, err := spannerdriver.CreateConnector(config)
51+
if err != nil {
52+
return 0, err
53+
}
54+
db := sql.OpenDB(connector)
55+
// Create a connection to force an error if the connection string is invalid.
56+
conn, err := db.Conn(ctx)
57+
if err != nil {
58+
return 0, err
59+
}
60+
_ = conn.Close()
61+
62+
id := poolsIdx.Add(1)
63+
pool := &Pool{
64+
db: db,
65+
connections: &sync.Map{},
66+
}
67+
pools.Store(id, pool)
68+
return id, nil
69+
}
70+
71+
// ClosePool closes the pool with the given id. All open connections in the pool are also closed, and the pool cannot
72+
// be used to create any new connections.
73+
func ClosePool(ctx context.Context, id int64) error {
74+
p, ok := pools.LoadAndDelete(id)
75+
if !ok {
76+
// Closing an unknown pool or a pool that has previously been closed is a no-op.
77+
return nil
78+
}
79+
pool := p.(*Pool)
80+
pool.connections.Range(func(key, value interface{}) bool {
81+
conn := value.(*Connection)
82+
_ = conn.close(ctx)
83+
return true
84+
})
85+
if err := pool.db.Close(); err != nil {
86+
return err
87+
}
88+
return nil
89+
}
90+
91+
// CreateConnection creates a new connection in the given pool. This is a relatively cheap operation, as a connection
92+
// does not have its own physical connection to Spanner. Instead, all connections in the same pool share the same
93+
// underlying Spanner client, which again contains a gRPC channel pool.
94+
func CreateConnection(ctx context.Context, poolId int64) (int64, error) {
95+
pool, err := findPool(poolId)
96+
if err != nil {
97+
return 0, err
98+
}
99+
sqlConn, err := pool.db.Conn(ctx)
100+
if err != nil {
101+
return 0, err
102+
}
103+
id := poolsIdx.Add(1)
104+
conn := &Connection{
105+
backend: sqlConn,
106+
results: &sync.Map{},
107+
}
108+
pool.connections.Store(id, conn)
109+
110+
return id, nil
111+
}
112+
113+
func findPool(poolId int64) (*Pool, error) {
114+
p, ok := pools.Load(poolId)
115+
if !ok {
116+
return nil, status.Errorf(codes.NotFound, "pool %v not found", poolId)
117+
}
118+
pool := p.(*Pool)
119+
return pool, nil
120+
}
121+
122+
func findConnection(poolId, connId int64) (*Connection, error) {
123+
pool, err := findPool(poolId)
124+
if err != nil {
125+
return nil, err
126+
}
127+
c, ok := pool.connections.Load(connId)
128+
if !ok {
129+
return nil, status.Errorf(codes.NotFound, "connection %v not found", connId)
130+
}
131+
conn := c.(*Connection)
132+
return conn, nil
133+
}

0 commit comments

Comments
 (0)