Skip to content

Commit 901d354

Browse files
jrauh01lippserd
authored andcommitted
Add functional options to 'DeleteStreamed' and adjust for 'UpsertStreamed'
1 parent 0a57f8b commit 901d354

File tree

1 file changed

+89
-6
lines changed

1 file changed

+89
-6
lines changed

database/db.go

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -732,25 +732,67 @@ func (db *DB) CreateIgnoreStreamed(
732732
)
733733
}
734734

735+
func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
736+
return func(options *ExecOptions) {
737+
options.onSuccess = onSuccess
738+
}
739+
}
740+
741+
func WithStatement(stmt string, placeholders int) ExecOption {
742+
return func(options *ExecOptions) {
743+
options.stmt = stmt
744+
options.placeholders = placeholders
745+
}
746+
}
747+
748+
type ExecOption func(options *ExecOptions)
749+
750+
type ExecOptions struct {
751+
onSuccess []OnSuccess[Entity]
752+
stmt string
753+
placeholders int
754+
}
755+
756+
func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
757+
execOptions := &ExecOptions{}
758+
759+
for _, option := range execOpts {
760+
option(execOptions)
761+
}
762+
763+
return execOptions
764+
}
765+
735766
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
736767
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
737768
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
738769
// concurrency is controlled via Options.MaxConnectionsPerTable.
739770
// Entities for which the query ran successfully will be passed to onSuccess.
740771
func (db *DB) UpsertStreamed(
741-
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
772+
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
742773
) error {
774+
775+
execOptions := NewExecOptions(execOpts...)
776+
743777
first, forward, err := com.CopyFirst(ctx, entities)
744778
if err != nil {
745779
return errors.Wrap(err, "can't copy first entity")
746780
}
747781

748782
sem := db.GetSemaphoreForTable(TableName(first))
749-
stmt, placeholders := db.BuildUpsertStmt(first)
783+
var stmt string
784+
var placeholders int
785+
786+
if execOptions.stmt != "" {
787+
stmt = execOptions.stmt
788+
placeholders = execOptions.placeholders
789+
} else {
790+
stmt, placeholders = db.BuildUpsertStmt(first)
791+
}
750792

751793
return db.NamedBulkExec(
752794
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
753-
forward, SplitOnDupId[Entity], onSuccess...,
795+
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
754796
)
755797
}
756798

@@ -769,17 +811,58 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
769811
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
770812
}
771813

814+
func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
815+
return func(options *DeleteOptions) {
816+
options.onSuccess = onSuccess
817+
}
818+
}
819+
820+
func ByColumn(column string) DeleteOption {
821+
return func(options *DeleteOptions) {
822+
options.column = column
823+
}
824+
}
825+
826+
type DeleteOption func(options *DeleteOptions)
827+
828+
type DeleteOptions struct {
829+
onSuccess []OnSuccess[any]
830+
column string
831+
}
832+
833+
func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
834+
deleteOptions := &DeleteOptions{}
835+
836+
for _, option := range execOpts {
837+
option(deleteOptions)
838+
}
839+
840+
return deleteOptions
841+
}
842+
772843
// DeleteStreamed bulk deletes the specified ids via BulkExec.
773844
// The delete statement is created using BuildDeleteStmt with the passed entityType.
774845
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
775846
// concurrency is controlled via Options.MaxConnectionsPerTable.
776847
// IDs for which the query ran successfully will be passed to onSuccess.
777848
func (db *DB) DeleteStreamed(
778-
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
849+
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
779850
) error {
851+
852+
deleteOptions := NewDeleteOptions(deleteOpts...)
853+
780854
sem := db.GetSemaphoreForTable(TableName(entityType))
855+
856+
var stmt string
857+
858+
if deleteOptions.column != "" {
859+
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
860+
} else {
861+
stmt = db.BuildDeleteStmt(entityType)
862+
}
863+
781864
return db.BulkExec(
782-
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
865+
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
783866
)
784867
}
785868

@@ -795,7 +878,7 @@ func (db *DB) Delete(
795878
}
796879
close(idsCh)
797880

798-
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
881+
return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
799882
}
800883

801884
// ExecTx executes the provided function within a database transaction.

0 commit comments

Comments
 (0)