Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Features:

New in v3:
* Supports ILP over HTTP using the same client semantics
* Supports n-dimensional arrays of doubles for QuestDB servers 9.0.0 and up

Documentation is available [here](https://pkg.go.dev/github.com/questdb/go-questdb-client/v3).

Expand Down Expand Up @@ -99,6 +100,95 @@ HTTP is the recommended transport to use. To connect via TCP, set the configurat
// ...
```

## N-dimensional arrays

QuestDB server version 9.0.0 and newer supports n-dimensional arrays of double precision floating point numbers.
The Go client provides several methods to send arrays to QuestDB:

### 1D Arrays

```go
// Send a 1D array of doubles
values1D := []float64{1.1, 2.2, 3.3, 4.4}
err = sender.
Table("measurements").
Symbol("sensor", "temp_probe_1").
Float641DArrayColumn("readings", values1D).
AtNow(ctx)
```

### 2D Arrays

```go
// Send a 2D array of doubles (must be rectangular)
values2D := [][]float64{
{1.1, 2.2, 3.3},
{4.4, 5.5, 6.6},
{7.7, 8.8, 9.9},
}
err = sender.
Table("matrix_data").
Symbol("experiment", "test_001").
Float642DArrayColumn("matrix", values2D).
AtNow(ctx)
```

### 3D Arrays

```go
// Send a 3D array of doubles (must be regular cuboid shape)
values3D := [][][]float64{
{{1.0, 2.0}, {3.0, 4.0}},
{{5.0, 6.0}, {7.0, 8.0}},
}
err = sender.
Table("tensor_data").
Symbol("model", "neural_net_v1").
Float643DArrayColumn("weights", values3D).
AtNow(ctx)
```

### N-dimensional Arrays

For higher dimensions, use the `NewNDArray` function:

```go
// Create a 2x3x4 array
arr, err := qdb.NewNDArray[float64](2, 3, 4)
if err != nil {
log.Fatal(err)
}

// Fill with values
arr.Fill(1.5)

// Or set individual values
arr.Set([]uint{0, 1, 2}, 42.0)

err = sender.
Table("ndarray_data").
Symbol("dataset", "training_batch_1").
Float64NDArrayColumn("features", arr).
AtNow(ctx)
```

The array data is sent over a new protocol version (2) that is auto-negotiated
when using HTTP(s), or can be specified explicitly via the ``protocol_version=2``
parameter when using TCP(s).

We recommend using HTTP(s), but here is an TCP example, should you need it::

```go
sender, err := qdb.NewLineSender(ctx,
qdb.WithTcp(),
qdb.WithProtocolVersion(qdb.ProtocolVersion2))
```

When using ``protocol_version=2`` (with either TCP(s) or HTTP(s)), the sender
will now also serialize ``float64`` (double-precision) columns as binary.
You might see a performance uplift if this is a dominant data type in your
ingestion workload.

## Pooled Line Senders

**Warning: Experimental feature designed for use with HTTP senders ONLY**
Expand Down
240 changes: 238 additions & 2 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,67 @@ package questdb

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"math/big"
"strconv"
"time"
"unsafe"
)

// errInvalidMsg indicates a failed attempt to construct an ILP
// message, e.g. duplicate calls to Table method or illegal
// chars found in table or column name.
var errInvalidMsg = errors.New("invalid message")

type binaryFlag byte

const (
arrayBinaryFlag binaryFlag = 14
float64BinaryFlag binaryFlag = 16
)

// isLittleEndian checks if the current machine uses little-endian byte order
func isLittleEndian() bool {
var i int32 = 0x01020304
return *(*byte)(unsafe.Pointer(&i)) == 0x04
}

// writeFloat64Data optimally writes float64 slice data to buffer
// Uses batch memory copy on little-endian machines for better performance
func (b *buffer) writeFloat64Data(data []float64) {
if isLittleEndian() && len(data) > 0 {
b.Write(unsafe.Slice((*byte)(unsafe.Pointer(&data[0])), len(data)*8))
} else {
bytes := make([]byte, 8)
for _, val := range data {
binary.LittleEndian.PutUint64(bytes[0:], math.Float64bits(val))
b.Write(bytes)
}
}
}

// writeUint32 optimally writes a single uint32 value
func (b *buffer) writeUint32(val uint32) {
if isLittleEndian() {
// On little-endian machines, we can directly write the uint32 as bytes
b.Write((*[4]byte)(unsafe.Pointer(&val))[:])
} else {
// On big-endian machines, use the standard conversion
data := make([]byte, 4)
binary.LittleEndian.PutUint32(data, val)
b.Write(data)
}
}

type arrayElemType byte

const (
arrayElemDouble arrayElemType = 10
)

// buffer is a wrapper on top of bytes.Buffer. It extends the
// original struct with methods for writing int64 and float64
// numbers without unnecessary allocations.
Expand Down Expand Up @@ -90,6 +138,12 @@ func (b *buffer) ClearLastErr() {
b.lastErr = nil
}

func (b *buffer) SetLastErr(err error) {
if b.lastErr == nil {
b.lastErr = err
}
}

func (b *buffer) writeInt(i int64) {
// We need up to 20 bytes to fit an int64, including a sign.
var a [20]byte
Expand Down Expand Up @@ -393,8 +447,8 @@ func (b *buffer) resetMsgFlags() {
b.hasFields = false
}

func (b *buffer) Messages() string {
return b.String()
func (b *buffer) Messages() []byte {
return b.Buffer.Bytes()
}

func (b *buffer) Table(name string) *buffer {
Expand Down Expand Up @@ -517,6 +571,188 @@ func (b *buffer) Float64Column(name string, val float64) *buffer {
return b
}

func (b *buffer) Float64ColumnBinaryFormat(name string, val float64) *buffer {
if !b.prepareForField() {
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}
b.WriteByte('=')
// binary format flag
b.WriteByte('=')
b.WriteByte(byte(float64BinaryFlag))
if isLittleEndian() {
b.Write((*[8]byte)(unsafe.Pointer(&val))[:])
} else {
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, math.Float64bits(val))
b.Write(data)
}
b.hasFields = true
return b
}

func (b *buffer) writeFloat64ArrayHeader(dims byte) {
b.WriteByte('=')
b.WriteByte('=')
b.WriteByte(byte(arrayBinaryFlag))
b.WriteByte(byte(arrayElemDouble))
b.WriteByte(dims)
}

func (b *buffer) Float641DArrayColumn(name string, values []float64) *buffer {
if !b.prepareForField() {
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}

dim1 := len(values)
b.writeFloat64ArrayHeader(1)

// Write shape
b.writeUint32(uint32(dim1))

// Write values
if len(values) > 0 {
b.writeFloat64Data(values)
}

b.hasFields = true
return b
}

func (b *buffer) Float642DArrayColumn(name string, values [][]float64) *buffer {
if !b.prepareForField() {
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}

// Validate array shape
dim1 := len(values)
var dim2 int
if dim1 > 0 {
dim2 = len(values[0])
for i, row := range values {
if len(row) != dim2 {
b.lastErr = fmt.Errorf("irregular 2D array shape: row %d has length %d, expected %d", i, len(row), dim2)
return b
}
}
}

b.writeFloat64ArrayHeader(2)

// Write shape
b.writeUint32(uint32(dim1))
b.writeUint32(uint32(dim2))

// Write values
for _, row := range values {
if len(row) > 0 {
b.writeFloat64Data(row)
}
}

b.hasFields = true
return b
}

func (b *buffer) Float643DArrayColumn(name string, values [][][]float64) *buffer {
if !b.prepareForField() {
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}

// Validate array shape
dim1 := len(values)
var dim2, dim3 int
if dim1 > 0 {
dim2 = len(values[0])
if dim2 > 0 {
dim3 = len(values[0][0])
}

for i, level1 := range values {
if len(level1) != dim2 {
b.lastErr = fmt.Errorf("irregular 3D array shape: level1[%d] has length %d, expected %d", i, len(level1), dim2)
return b
}
for j, level2 := range level1 {
if len(level2) != dim3 {
b.lastErr = fmt.Errorf("irregular 3D array shape: level2[%d][%d] has length %d, expected %d", i, j, len(level2), dim3)
return b
}
}
}
}

b.writeFloat64ArrayHeader(3)

// Write shape
b.writeUint32(uint32(dim1))
b.writeUint32(uint32(dim2))
b.writeUint32(uint32(dim3))

// Write values
for _, level1 := range values {
for _, level2 := range level1 {
if len(level2) > 0 {
b.writeFloat64Data(level2)
}
}
}

b.hasFields = true
return b
}

func (b *buffer) Float64NDArrayColumn(name string, value *NdArray[float64]) *buffer {
if !b.prepareForField() {
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}

// Validate the NdArray
if value == nil {
b.lastErr = fmt.Errorf("NDArray cannot be nil")
return b
}

shape := value.Shape()
numDims := value.NDims()

// Write nDims
b.writeFloat64ArrayHeader(byte(numDims))

// Write shape
for _, dim := range shape {
b.writeUint32(uint32(dim))
}

// Write data
data := value.GetData()
if len(data) > 0 {
b.writeFloat64Data(data)
}

b.hasFields = true
return b
}

func (b *buffer) StringColumn(name, val string) *buffer {
if !b.prepareForField() {
return b
Expand Down
Loading