Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
* Supported `sql.Null*` from `database/sql` as query params in `toValue` func
* Added `WithConcurrentResultSets` option for `db.Query().Query()`

## v3.118.0
* Added support for nullable `Date32`, `Datetime64`, `Timestamp64`, and `Interval64` types in the `optional` parameter builder
Expand Down
1 change: 1 addition & 0 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
if err != nil {
return xerrors.WithStackTrace(err)
}

defer func() {
_ = streamResult.Close(ctx)
}()
Expand Down
235 changes: 235 additions & 0 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,241 @@ func TestClient(t *testing.T) {
require.Nil(t, r3)
}
})
t.Run("ConcurrentResultSets", func(t *testing.T) {
ctrl := gomock.NewController(t)
r, err := clientQuery(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
TxMeta: &Ydb_Query.TransactionMeta{
Id: "456",
},
ResultSetIndex: 0,
ResultSet: &Ydb.ResultSet{
Columns: []*Ydb.Column{
{
Name: "a",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UINT64,
},
},
},
{
Name: "b",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UTF8,
},
},
},
},
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 1,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "1",
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 2,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "2",
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 3,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "3",
},
}},
},
},
},
}, nil)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
ResultSetIndex: 1,
ResultSet: &Ydb.ResultSet{
Columns: []*Ydb.Column{
{
Name: "c",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UINT64,
},
},
},
{
Name: "d",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UTF8,
},
},
},
{
Name: "e",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_BOOL,
},
},
},
},
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 1,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "1",
},
}, {
Value: &Ydb.Value_BoolValue{
BoolValue: true,
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 2,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "2",
},
}, {
Value: &Ydb.Value_BoolValue{
BoolValue: false,
},
}},
},
},
},
}, nil)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
ResultSetIndex: 0,
ResultSet: &Ydb.ResultSet{
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 4,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "4",
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 5,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "5",
},
}},
},
},
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
client := NewMockQueryServiceClient(ctrl)
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)

return newTestSessionWithClient("123", client, true), nil
}), "", query.WithConcurrentResultSets(true))
require.NoError(t, err)
{
rs, err := r.NextResultSet(ctx)
require.NoError(t, err)
r1, err := rs.NextRow(ctx)
require.NoError(t, err)
var (
a uint64
b string
)
err = r1.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 1, a)
require.EqualValues(t, "1", b)
r2, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r2.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 2, a)
require.EqualValues(t, "2", b)
r3, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r3.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 3, a)
require.EqualValues(t, "3", b)
r4, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r4.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 4, a)
require.EqualValues(t, "4", b)
r5, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r5.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 5, a)
require.EqualValues(t, "5", b)
r6, err := rs.NextRow(ctx)
require.ErrorIs(t, err, io.EOF)
require.Nil(t, r6)
}
{
rs, err := r.NextResultSet(ctx)
require.NoError(t, err)
r1, err := rs.NextRow(ctx)
require.NoError(t, err)
var (
a uint64
b string
c bool
)
err = r1.Scan(&a, &b, &c)
require.NoError(t, err)
require.EqualValues(t, 1, a)
require.EqualValues(t, "1", b)
require.EqualValues(t, true, c)
r2, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r2.Scan(&a, &b, &c)
require.NoError(t, err)
require.EqualValues(t, 2, a)
require.EqualValues(t, "2", b)
require.EqualValues(t, false, c)
r3, err := rs.NextRow(ctx)
require.ErrorIs(t, err, io.EOF)
require.Nil(t, r3)
}
})
t.Run("AllowImplicitSessions", func(t *testing.T) {
_, err := mockClientForImplicitSessionTest(ctx, t).
Query(ctx, "SELECT 1")
Expand Down
3 changes: 2 additions & 1 deletion internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type executeSettings interface {
ResourcePool() string
ResponsePartLimitSizeBytes() int64
Label() string
ConcurrentResultSets() bool
}

type executeScriptConfig interface {
Expand Down Expand Up @@ -92,7 +93,7 @@ func executeQueryRequest(sessionID, q string, cfg executeSettings) (
},
Parameters: params,
StatsMode: Ydb_Query.StatsMode(cfg.StatsMode()),
ConcurrentResultSets: false,
ConcurrentResultSets: cfg.ConcurrentResultSets(),
PoolId: cfg.ResourcePool(),
ResponsePartLimitBytes: cfg.ResponsePartLimitSizeBytes(),
}
Expand Down
18 changes: 17 additions & 1 deletion internal/query/options/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (
issueCallback func(issues []*Ydb_Issue.IssueMessage)
responsePartLimitBytes int64
label string
concurrentResultSets bool
}

// Execute is an interface for execute method options
Expand All @@ -72,9 +73,12 @@ type (
}
execModeOption = ExecMode
responsePartLimitBytes int64
issuesOption struct {

issuesOption struct {
callback func([]*Ydb_Issue.IssueMessage)
}

concurrentResultSets bool
)

func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
Expand Down Expand Up @@ -132,6 +136,10 @@ func (opts issuesOption) applyExecuteOption(s *executeSettings) {
s.issueCallback = opts.callback
}

func (opt concurrentResultSets) applyExecuteOption(s *executeSettings) {
s.concurrentResultSets = bool(opt)
}

const (
ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE)
ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE)
Expand Down Expand Up @@ -205,6 +213,10 @@ func (s *executeSettings) Label() string {
return s.label
}

func (s *executeSettings) ConcurrentResultSets() bool {
return s.concurrentResultSets
}

func WithParameters(params params.Parameters) parametersOption {
return parametersOption{
params: params,
Expand Down Expand Up @@ -237,6 +249,10 @@ func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes {
return responsePartLimitBytes(size)
}

func WithConcurrentResultSets(isEnabled bool) concurrentResultSets {
return concurrentResultSets(isEnabled)
}

func (size responsePartLimitBytes) applyExecuteOption(s *executeSettings) {
s.responsePartLimitBytes = int64(size)
}
Expand Down
Loading
Loading