Skip to content

Commit 5a1c509

Browse files
committed
Structs for upsert and delete
1 parent 9a0bcfc commit 5a1c509

File tree

2 files changed

+141
-89
lines changed

2 files changed

+141
-89
lines changed

database/db.go

Lines changed: 6 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -731,67 +731,25 @@ func (db *DB) CreateIgnoreStreamed(
731731
)
732732
}
733733

734-
func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
735-
return func(options *ExecOptions) {
736-
options.onSuccess = onSuccess
737-
}
738-
}
739-
740-
func WithStatement(stmt string, placeholders int) ExecOption {
741-
return func(options *ExecOptions) {
742-
options.stmt = stmt
743-
options.placeholders = placeholders
744-
}
745-
}
746-
747-
type ExecOption func(options *ExecOptions)
748-
749-
type ExecOptions struct {
750-
onSuccess []OnSuccess[Entity]
751-
stmt string
752-
placeholders int
753-
}
754-
755-
func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
756-
execOptions := &ExecOptions{}
757-
758-
for _, option := range execOpts {
759-
option(execOptions)
760-
}
761-
762-
return execOptions
763-
}
764-
765734
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
766735
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
767736
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
768737
// concurrency is controlled via Options.MaxConnectionsPerTable.
769738
// Entities for which the query ran successfully will be passed to onSuccess.
770739
func (db *DB) UpsertStreamed(
771-
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
740+
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
772741
) error {
773-
774-
execOptions := NewExecOptions(execOpts...)
775-
776742
first, forward, err := com.CopyFirst(ctx, entities)
777743
if err != nil {
778744
return errors.Wrap(err, "can't copy first entity")
779745
}
780746

781747
sem := db.GetSemaphoreForTable(TableName(first))
782-
var stmt string
783-
var placeholders int
784-
785-
if execOptions.stmt != "" {
786-
stmt = execOptions.stmt
787-
placeholders = execOptions.placeholders
788-
} else {
789-
stmt, placeholders = db.BuildUpsertStmt(first)
790-
}
748+
stmt, placeholders := db.BuildUpsertStmt(first)
791749

792750
return db.NamedBulkExec(
793751
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
794-
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
752+
forward, SplitOnDupId[Entity], onSuccess...,
795753
)
796754
}
797755

@@ -810,58 +768,17 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
810768
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
811769
}
812770

813-
func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
814-
return func(options *DeleteOptions) {
815-
options.onSuccess = onSuccess
816-
}
817-
}
818-
819-
func ByColumn(column string) DeleteOption {
820-
return func(options *DeleteOptions) {
821-
options.column = column
822-
}
823-
}
824-
825-
type DeleteOption func(options *DeleteOptions)
826-
827-
type DeleteOptions struct {
828-
onSuccess []OnSuccess[any]
829-
column string
830-
}
831-
832-
func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
833-
deleteOptions := &DeleteOptions{}
834-
835-
for _, option := range execOpts {
836-
option(deleteOptions)
837-
}
838-
839-
return deleteOptions
840-
}
841-
842771
// DeleteStreamed bulk deletes the specified ids via BulkExec.
843772
// The delete statement is created using BuildDeleteStmt with the passed entityType.
844773
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
845774
// concurrency is controlled via Options.MaxConnectionsPerTable.
846775
// IDs for which the query ran successfully will be passed to onSuccess.
847776
func (db *DB) DeleteStreamed(
848-
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
777+
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
849778
) error {
850-
851-
deleteOptions := NewDeleteOptions(deleteOpts...)
852-
853779
sem := db.GetSemaphoreForTable(TableName(entityType))
854-
855-
var stmt string
856-
857-
if deleteOptions.column != "" {
858-
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
859-
} else {
860-
stmt = db.BuildDeleteStmt(entityType)
861-
}
862-
863780
return db.BulkExec(
864-
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
781+
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
865782
)
866783
}
867784

@@ -877,7 +794,7 @@ func (db *DB) Delete(
877794
}
878795
close(idsCh)
879796

880-
return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
797+
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
881798
}
882799

883800
// ExecTx executes the provided function within a database transaction.

database/optionally.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icinga-go-library/com"
7+
"github.com/pkg/errors"
8+
)
9+
10+
// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists.
11+
type Upsert interface {
12+
// Stream bulk upserts the specified entities via NamedBulkExec.
13+
// If not explicitly specified, the upsert statement is created using
14+
// BuildUpsertStmt with the first entity from the entities stream.
15+
Stream(ctx context.Context, entities <-chan Entity) error
16+
}
17+
18+
// UpsertOption is a functional option for NewUpsert.
19+
type UpsertOption func(u *upsert)
20+
21+
// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the
22+
// operation was performed successfully are passed to the callbacks.
23+
func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption {
24+
return func(u *upsert) {
25+
u.onUpsert = onUpsert
26+
}
27+
}
28+
29+
// WithStatement uses the specified statement for bulk upserts instead of automatically creating one.
30+
func WithStatement(stmt string, placeholders int) UpsertOption {
31+
return func(u *upsert) {
32+
u.stmt = stmt
33+
u.placeholders = placeholders
34+
}
35+
}
36+
37+
// NewUpsert creates a new Upsert initalized with a database.
38+
func NewUpsert(db *DB, options ...UpsertOption) Upsert {
39+
u := &upsert{db: db}
40+
41+
for _, option := range options {
42+
option(u)
43+
}
44+
45+
return u
46+
}
47+
48+
type upsert struct {
49+
db *DB
50+
onUpsert []OnSuccess[Entity]
51+
stmt string
52+
placeholders int
53+
}
54+
55+
func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error {
56+
first, forward, err := com.CopyFirst(ctx, entities)
57+
if err != nil {
58+
return errors.Wrap(err, "can't copy first entity")
59+
}
60+
61+
sem := u.db.GetSemaphoreForTable(TableName(first))
62+
var stmt string
63+
var placeholders int
64+
65+
if u.stmt != "" {
66+
stmt = u.stmt
67+
placeholders = u.placeholders
68+
} else {
69+
stmt, placeholders = u.db.BuildUpsertStmt(first)
70+
}
71+
72+
return u.db.NamedBulkExec(
73+
ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem,
74+
forward, SplitOnDupId[Entity], u.onUpsert...,
75+
)
76+
}
77+
78+
// Delete deletes rows of a table.
79+
type Delete interface {
80+
// Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec.
81+
// Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt.
82+
Stream(ctx context.Context, from any, args <-chan any) error
83+
}
84+
85+
// DeleteOption is a functional option for NewDelete.
86+
type DeleteOption func(options *delete)
87+
88+
// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the
89+
// operation was performed successfully are passed to the callbacks.
90+
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
91+
return func(d *delete) {
92+
d.onDelete = onDelete
93+
}
94+
}
95+
96+
// ByColumn uses the given column for the WHERE clause that the rows must
97+
// satisfy in order to be deleted, instead of automatically using ID.
98+
func ByColumn(column string) DeleteOption {
99+
return func(d *delete) {
100+
d.column = column
101+
}
102+
}
103+
104+
// NewDelete creates a new Delete initalized with a database.
105+
func NewDelete(db *DB, options ...DeleteOption) Delete {
106+
d := &delete{db: db}
107+
108+
for _, option := range options {
109+
option(d)
110+
}
111+
112+
return d
113+
}
114+
115+
type delete struct {
116+
db *DB
117+
column string
118+
onDelete []OnSuccess[any]
119+
}
120+
121+
func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error {
122+
var stmt string
123+
124+
if d.column != "" {
125+
stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column)
126+
} else {
127+
stmt = d.db.BuildDeleteStmt(from)
128+
}
129+
130+
sem := d.db.GetSemaphoreForTable(TableName(from))
131+
132+
return d.db.BulkExec(
133+
ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete...,
134+
)
135+
}

0 commit comments

Comments
 (0)