Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
* Supported `sql.Null*` from `database/sql` as query params in `toValue` func
* Added `WithConcurrentResultSets` option for `db.Query().Query()`

## v3.118.0
* Added support for nullable `Date32`, `Datetime64`, `Timestamp64`, and `Interval64` types in the `optional` parameter builder
Expand Down
1 change: 1 addition & 0 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
if err != nil {
return xerrors.WithStackTrace(err)
}

defer func() {
_ = streamResult.Close(ctx)
}()
Expand Down
235 changes: 235 additions & 0 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,241 @@ func TestClient(t *testing.T) {
require.Nil(t, r3)
}
})
t.Run("ConcurrentResultSets", func(t *testing.T) {
ctrl := gomock.NewController(t)
r, err := clientQuery(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
TxMeta: &Ydb_Query.TransactionMeta{
Id: "456",
},
ResultSetIndex: 0,
ResultSet: &Ydb.ResultSet{
Columns: []*Ydb.Column{
{
Name: "a",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UINT64,
},
},
},
{
Name: "b",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UTF8,
},
},
},
},
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 1,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "1",
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 2,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "2",
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 3,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "3",
},
}},
},
},
},
}, nil)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
ResultSetIndex: 1,
ResultSet: &Ydb.ResultSet{
Columns: []*Ydb.Column{
{
Name: "c",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UINT64,
},
},
},
{
Name: "d",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UTF8,
},
},
},
{
Name: "e",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_BOOL,
},
},
},
},
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 1,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "1",
},
}, {
Value: &Ydb.Value_BoolValue{
BoolValue: true,
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 2,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "2",
},
}, {
Value: &Ydb.Value_BoolValue{
BoolValue: false,
},
}},
},
},
},
}, nil)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
ResultSetIndex: 0,
ResultSet: &Ydb.ResultSet{
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 4,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "4",
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 5,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "5",
},
}},
},
},
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
client := NewMockQueryServiceClient(ctrl)
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)

return newTestSessionWithClient("123", client, true), nil
}), "", query.WithConcurrentResultSets(true))
require.NoError(t, err)
{
rs, err := r.NextResultSet(ctx)
require.NoError(t, err)
r1, err := rs.NextRow(ctx)
require.NoError(t, err)
var (
a uint64
b string
)
err = r1.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 1, a)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как бы улучшить читаемость? Например, перебирать в цикле?

require.EqualValues(t, "1", b)
r2, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r2.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 2, a)
require.EqualValues(t, "2", b)
r3, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r3.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 3, a)
require.EqualValues(t, "3", b)
r4, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r4.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 4, a)
require.EqualValues(t, "4", b)
r5, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r5.Scan(&a, &b)
require.NoError(t, err)
require.EqualValues(t, 5, a)
require.EqualValues(t, "5", b)
r6, err := rs.NextRow(ctx)
require.ErrorIs(t, err, io.EOF)
require.Nil(t, r6)
}
{
rs, err := r.NextResultSet(ctx)
require.NoError(t, err)
r1, err := rs.NextRow(ctx)
require.NoError(t, err)
var (
a uint64
b string
c bool
)
err = r1.Scan(&a, &b, &c)
require.NoError(t, err)
require.EqualValues(t, 1, a)
require.EqualValues(t, "1", b)
require.EqualValues(t, true, c)
r2, err := rs.NextRow(ctx)
require.NoError(t, err)
err = r2.Scan(&a, &b, &c)
require.NoError(t, err)
require.EqualValues(t, 2, a)
require.EqualValues(t, "2", b)
require.EqualValues(t, false, c)
r3, err := rs.NextRow(ctx)
require.ErrorIs(t, err, io.EOF)
require.Nil(t, r3)
}
})
t.Run("AllowImplicitSessions", func(t *testing.T) {
_, err := mockClientForImplicitSessionTest(ctx, t).
Query(ctx, "SELECT 1")
Expand Down
3 changes: 2 additions & 1 deletion internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type executeSettings interface {
ResourcePool() string
ResponsePartLimitSizeBytes() int64
Label() string
ConcurrentResultSets() bool
}

type executeScriptConfig interface {
Expand Down Expand Up @@ -92,7 +93,7 @@ func executeQueryRequest(sessionID, q string, cfg executeSettings) (
},
Parameters: params,
StatsMode: Ydb_Query.StatsMode(cfg.StatsMode()),
ConcurrentResultSets: false,
ConcurrentResultSets: cfg.ConcurrentResultSets(),
PoolId: cfg.ResourcePool(),
ResponsePartLimitBytes: cfg.ResponsePartLimitSizeBytes(),
}
Expand Down
18 changes: 17 additions & 1 deletion internal/query/options/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (
issueCallback func(issues []*Ydb_Issue.IssueMessage)
responsePartLimitBytes int64
label string
concurrentResultSets bool
}

// Execute is an interface for execute method options
Expand All @@ -72,9 +73,12 @@ type (
}
execModeOption = ExecMode
responsePartLimitBytes int64
issuesOption struct {

issuesOption struct {
callback func([]*Ydb_Issue.IssueMessage)
}

concurrentResultSets bool
)

func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
Expand Down Expand Up @@ -132,6 +136,10 @@ func (opts issuesOption) applyExecuteOption(s *executeSettings) {
s.issueCallback = opts.callback
}

func (opt concurrentResultSets) applyExecuteOption(s *executeSettings) {
s.concurrentResultSets = bool(opt)
}

const (
ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE)
ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE)
Expand Down Expand Up @@ -205,6 +213,10 @@ func (s *executeSettings) Label() string {
return s.label
}

func (s *executeSettings) ConcurrentResultSets() bool {
return s.concurrentResultSets
}

func WithParameters(params params.Parameters) parametersOption {
return parametersOption{
params: params,
Expand Down Expand Up @@ -237,6 +249,10 @@ func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes {
return responsePartLimitBytes(size)
}

func WithConcurrentResultSets(isEnabled bool) concurrentResultSets {
return concurrentResultSets(isEnabled)
}

func (size responsePartLimitBytes) applyExecuteOption(s *executeSettings) {
s.responsePartLimitBytes = int64(size)
}
Expand Down
Loading
Loading