Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,15 +1043,12 @@ func TestSendBatchStatementTimeout(t *testing.T) {
assert.NoError(t, err)

// get pg_sleep results
rows, err := br.Query()
assert.NoError(t, err)
rows, _ := br.Query()

// Consume rows and check error
for rows.Next() {
}
rows.Close()
err = rows.Err()
assert.ErrorContains(t, err, "(SQLSTATE 57014)")
rows.Close()

// The last error should be repeated when closing the batch
err = br.Close()
Expand Down Expand Up @@ -1131,6 +1128,43 @@ func TestSendBatchHandlesTimeoutBetweenParseAndDescribe(t *testing.T) {
})
}

func TestBatchNetworkUsage(t *testing.T) {
t.Parallel()

config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
config.DefaultQueryExecMode = pgx.QueryExecModeCacheStatement
var counterConn *byteCounterConn
config.AfterNetConnect = func(ctx context.Context, config *pgconn.Config, conn net.Conn) (net.Conn, error) {
counterConn = &byteCounterConn{conn: conn}
return counterConn, nil
}

conn := mustConnect(t, config)
defer closeConn(t, conn)

pgxtest.SkipCockroachDB(t, conn, "Server uses different number of bytes for same operations")

counterConn.bytesWritten = 0
counterConn.bytesRead = 0

batch := &pgx.Batch{}

for range 10 {
batch.Queue(
"select n, 'Adam', 'Smith ' || n, 'male', '1952-06-16'::date, 258, 72, '{foo,bar,baz}'::text[], '2001-01-28 01:02:03-05'::timestamptz from generate_series(100001, 100000 + $1) n",
1,
)
}

err := conn.SendBatch(context.Background(), batch).Close()
require.NoError(t, err)

assert.Equal(t, 1736, counterConn.bytesRead)
assert.Equal(t, 1408, counterConn.bytesWritten)

ensureConnValid(t, conn)
}

func ExampleConn_SendBatch() {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
Expand Down
80 changes: 80 additions & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,41 @@ func BenchmarkMinimalPgConnPreparedSelect(b *testing.B) {
}
}

func BenchmarkMinimalPgConnPreparedStatementDescriptionSelect(b *testing.B) {
conn := mustConnect(b, mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE")))
defer closeConn(b, conn)

pgConn := conn.PgConn()

psd, err := pgConn.Prepare(context.Background(), "ps1", "select $1::int8", nil)
if err != nil {
b.Fatal(err)
}

encodedBytes := make([]byte, 8)

b.ResetTimer()
for i := 0; i < b.N; i++ {

rr := pgConn.ExecStatement(context.Background(), psd, [][]byte{encodedBytes}, []int16{1}, []int16{1})
if err != nil {
b.Fatal(err)
}

for rr.NextRow() {
for i := range rr.Values() {
if !bytes.Equal(rr.Values()[0], encodedBytes) {
b.Fatalf("unexpected values: %s %s", rr.Values()[i], encodedBytes)
}
}
}
_, err = rr.Close()
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkPointerPointerWithNullValues(b *testing.B) {
conn := mustConnect(b, mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE")))
defer closeConn(b, conn)
Expand Down Expand Up @@ -1265,6 +1300,51 @@ func BenchmarkSelectRowsPgConnExecPrepared(b *testing.B) {
}
}

func BenchmarkSelectRowsPgConnExecStatement(b *testing.B) {
conn := mustConnectString(b, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(b, conn)

rowCounts := getSelectRowsCounts(b)

psd, err := conn.PgConn().Prepare(context.Background(), "ps1", "select n, 'Adam', 'Smith ' || n, 'male', '1952-06-16'::date, 258, 72, '{foo,bar,baz}'::text[], '2001-01-28 01:02:03-05'::timestamptz from generate_series(100001, 100000 + $1) n", nil)
if err != nil {
b.Fatal(err)
}

for _, rowCount := range rowCounts {
b.Run(fmt.Sprintf("%d rows", rowCount), func(b *testing.B) {
formats := []struct {
name string
code int16
}{
{"text", pgx.TextFormatCode},
{"binary - mostly", pgx.BinaryFormatCode},
}
for _, format := range formats {
b.Run(format.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
rr := conn.PgConn().ExecStatement(
context.Background(),
psd,
[][]byte{[]byte(strconv.FormatInt(rowCount, 10))},
nil,
[]int16{format.code, pgx.TextFormatCode, pgx.TextFormatCode, pgx.TextFormatCode, format.code, format.code, format.code, format.code, format.code},
)
for rr.NextRow() {
rr.Values()
}

_, err := rr.Close()
if err != nil {
b.Fatal(err)
}
}
})
}
})
}
}

type queryRecorder struct {
conn net.Conn
writeBuf []byte
Expand Down
6 changes: 3 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (c *Conn) execPrepared(ctx context.Context, sd *pgconn.StatementDescription
return pgconn.CommandTag{}, err
}

result := c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
result := c.pgConn.ExecStatement(ctx, sd, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats).Read()
c.eqb.reset() // Allow c.eqb internal memory to be GC'ed as soon as possible.
return result.CommandTag, result.Err
}
Expand Down Expand Up @@ -842,7 +842,7 @@ optionLoop:
if !explicitPreparedStatement && mode == QueryExecModeCacheDescribe {
rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.ParamValues, sd.ParamOIDs, c.eqb.ParamFormats, resultFormats)
} else {
rows.resultReader = c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, resultFormats)
rows.resultReader = c.pgConn.ExecStatement(ctx, sd, c.eqb.ParamValues, c.eqb.ParamFormats, resultFormats)
}
} else if mode == QueryExecModeExec {
err := c.eqb.Build(c.typeMap, nil, args)
Expand Down Expand Up @@ -1234,7 +1234,7 @@ func (c *Conn) sendBatchExtendedWithDescription(ctx context.Context, b *Batch, d
if bi.sd.Name == "" {
pipeline.SendQueryParams(bi.sd.SQL, c.eqb.ParamValues, bi.sd.ParamOIDs, c.eqb.ParamFormats, c.eqb.ResultFormats)
} else {
pipeline.SendQueryPrepared(bi.sd.Name, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats)
pipeline.SendQueryStatement(bi.sd, c.eqb.ParamValues, c.eqb.ParamFormats, c.eqb.ResultFormats)
}
}

Expand Down
Loading