Skip to content

Commit 46d2298

Browse files
committed
Add gotData and endOfProcess handlers
1 parent 5ebfd0c commit 46d2298

File tree

6 files changed

+156
-4
lines changed

6 files changed

+156
-4
lines changed

conn_process.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type onProcess struct {
3232
progress func(*Progress)
3333
profileInfo func(*ProfileInfo)
3434
profileEvents func([]ProfileEvent)
35+
gotData func()
36+
endOfProcess func()
3537
}
3638

3739
func (c *connect) firstBlock(ctx context.Context, on *onProcess) (*proto.Block, error) {
@@ -143,6 +145,9 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
143145
func (c *connect) processImpl(ctx context.Context, on *onProcess) error {
144146
c.readerMutex.Lock()
145147
defer c.readerMutex.Unlock()
148+
if on.endOfProcess != nil {
149+
defer on.endOfProcess()
150+
}
146151

147152
for {
148153
if c.reader == nil {
@@ -178,6 +183,9 @@ func (c *connect) handle(ctx context.Context, packet byte, on *onProcess) error
178183
}
179184
if block.Rows() != 0 && on.data != nil {
180185
on.data(block)
186+
if on.gotData != nil {
187+
on.gotData()
188+
}
181189
}
182190
case proto.ServerException:
183191
return c.exception()

context.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import (
2222
"maps"
2323
"time"
2424

25-
"github.com/ClickHouse/clickhouse-go/v2/ext"
2625
"go.opentelemetry.io/otel/trace"
26+
27+
"github.com/ClickHouse/clickhouse-go/v2/ext"
2728
)
2829

2930
var _contextOptionKey = &QueryOptions{
@@ -59,6 +60,8 @@ type (
5960
progress func(*Progress)
6061
profileInfo func(*ProfileInfo)
6162
profileEvents func([]ProfileEvent)
63+
gotData func()
64+
endOfProcess func()
6265
}
6366
settings Settings
6467
parameters Parameters
@@ -147,6 +150,20 @@ func WithProfileEvents(fn func([]ProfileEvent)) QueryOption {
147150
}
148151
}
149152

153+
func WithGotData(fn func()) QueryOption {
154+
return func(o *QueryOptions) error {
155+
o.events.gotData = fn
156+
return nil
157+
}
158+
}
159+
160+
func WithEndOfProcess(fn func()) QueryOption {
161+
return func(o *QueryOptions) error {
162+
o.events.endOfProcess = fn
163+
return nil
164+
}
165+
}
166+
150167
func WithExternalTable(t ...*ext.Table) QueryOption {
151168
return func(o *QueryOptions) error {
152169
o.external = append(o.external, t...)
@@ -273,6 +290,16 @@ func (q *QueryOptions) onProcess() *onProcess {
273290
q.events.profileEvents(events)
274291
}
275292
},
293+
gotData: func() {
294+
if q.events.gotData != nil {
295+
q.events.gotData()
296+
}
297+
},
298+
endOfProcess: func() {
299+
if q.events.endOfProcess != nil {
300+
q.events.endOfProcess()
301+
}
302+
},
276303
}
277304
}
278305

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to ClickHouse, Inc. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. ClickHouse, Inc. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package clickhouse_api
19+
20+
import (
21+
"context"
22+
"fmt"
23+
24+
"github.com/ClickHouse/clickhouse-go/v2"
25+
)
26+
27+
func EndOfProcessAndGotData() error {
28+
conn, err := GetNativeConnection(clickhouse.Settings{
29+
"send_logs_level": "trace",
30+
}, nil, nil)
31+
if err != nil {
32+
return err
33+
}
34+
var totalBlocks int
35+
// use context to pass a call back for end of process and got data
36+
ctx := clickhouse.Context(context.Background(), clickhouse.WithEndOfProcess(func() {
37+
fmt.Println("process is finished")
38+
}), clickhouse.WithGotData(func() {
39+
totalBlocks++
40+
}))
41+
42+
rows, err := conn.Query(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
43+
if err != nil {
44+
return err
45+
}
46+
defer rows.Close()
47+
48+
for rows.Next() {
49+
}
50+
51+
fmt.Printf("Total data blocks: %d\n", totalBlocks)
52+
return rows.Err()
53+
}

examples/clickhouse_api/main_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ package clickhouse_api
2020
import (
2121
"context"
2222
"fmt"
23-
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
24-
"github.com/stretchr/testify/require"
2523
"os"
2624
"strconv"
2725
"testing"
26+
27+
"github.com/stretchr/testify/require"
28+
29+
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
2830
)
2931

3032
func TestMain(m *testing.M) {
@@ -237,3 +239,7 @@ func TestJSONStringExample(t *testing.T) {
237239
t.Skip("client cannot receive JSON strings")
238240
require.NoError(t, JSONStringExample())
239241
}
242+
243+
func TestEndOfProcessAndGotBlock(t *testing.T) {
244+
require.NoError(t, EndOfProcessAndGotData())
245+
}

examples/std/end_of_process.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to ClickHouse, Inc. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. ClickHouse, Inc. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package std
19+
20+
import (
21+
"context"
22+
"fmt"
23+
24+
"github.com/ClickHouse/clickhouse-go/v2"
25+
)
26+
27+
func EndOfProcessAndGotData() error {
28+
conn, err := GetStdOpenDBConnection(clickhouse.Native, clickhouse.Settings{
29+
"send_logs_level": "trace",
30+
}, nil, nil)
31+
if err != nil {
32+
return err
33+
}
34+
var totalBlocks int
35+
// use context to pass a call back for end of process and got data
36+
ctx := clickhouse.Context(context.Background(), clickhouse.WithEndOfProcess(func() {
37+
fmt.Println("process is finished")
38+
}), clickhouse.WithGotData(func() {
39+
totalBlocks++
40+
}))
41+
42+
rows, err := conn.QueryContext(ctx, "SELECT number from numbers(1000000) LIMIT 1000000")
43+
if err != nil {
44+
return err
45+
}
46+
defer rows.Close()
47+
48+
for rows.Next() {
49+
}
50+
51+
fmt.Printf("Total data blocks: %d\n", totalBlocks)
52+
return rows.Err()
53+
}

examples/std/main_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ import (
2626
"testing"
2727
"time"
2828

29-
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
3029
"github.com/stretchr/testify/require"
30+
31+
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
3132
)
3233

3334
func TestMain(m *testing.M) {
@@ -169,3 +170,7 @@ func TestJSONStringExample(t *testing.T) {
169170
t.Skip("client cannot receive JSON strings")
170171
require.NoError(t, JSONStringExample())
171172
}
173+
174+
func TestEndOfProcessAndGotBlock(t *testing.T) {
175+
require.NoError(t, EndOfProcessAndGotData())
176+
}

0 commit comments

Comments
 (0)