Skip to content

Commit 4149009

Browse files
authored
Decouple FlowConnectionConfigs from API (#3605)
In #3589 we want to stop using a field in a way that's not error-prone in the rest of the code (so, removing) but also doesn't break API backcompat (so, not removing). Trying to split FlowConnectionConfigs into two contracts that are made sure to be in sync with a little codegen. Generated code has small TODOs that would be removed by #3589 if we go this route. Validated that renaming the type of a workflow arg works well with history replay, only the payload is important.
1 parent 7b0bef4 commit 4149009

36 files changed

+441
-149
lines changed

.github/actions/genprotos/action.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ runs:
99
uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4
1010
with:
1111
path: |
12-
./flow/generated/protos
13-
./flow/cmd/typed_handler.go
12+
./flow/generated
1413
./nexus/pt/src/gen
1514
./ui/grpc_generated
16-
key: ${{ runner.os }}-build-genprotos-${{ hashFiles('buf.gen.yaml', './protos/peers.proto', './protos/flow.proto', './protos/route.proto', './flow/cmd/gen-grpc-wrapper/main.go') }}
15+
key: ${{ runner.os }}-build-genprotos-${{ hashFiles('buf.gen.yaml', './protos/peers.proto', './protos/flow.proto', './protos/route.proto', './flow/cmd/codegen/*.go') }}
1716

1817
- if: steps.cache.outputs.cache-hit != 'true'
1918
uses: bufbuild/buf-action@8f4a1456a0ab6a1eb80ba68e53832e6fcfacc16c # v1

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ flow/peer-flow
1313
go.work
1414
go.work.sum
1515

16-
# generated protobuf files
16+
# generated protobuf files and go generate
1717
ui/grpc_generated
1818
flow/generated
1919
nexus/pt/src/gen

flow/.gitignore

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,3 @@ vendor/
1010

1111
# goenv local version. See https://github.com/syndbg/goenv/blob/master/COMMANDS.md#goenv-local for more info.
1212
.go-version
13-
14-
# generated files
15-
cmd/typed_handler.go

flow/.golangci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ linters:
6060
settings:
6161
hugeParam:
6262
sizeThreshold: 1024
63+
ruleguard:
64+
rules: cmd/codegen/ruleguard.rules.go
6365
gosec:
6466
excludes:
6567
- G115

flow/activities/flowable.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
283283

284284
func (a *FlowableActivity) SyncFlow(
285285
ctx context.Context,
286-
config *protos.FlowConnectionConfigs,
286+
config *protos.FlowConnectionConfigsCore,
287287
options *protos.SyncFlowOptions,
288288
) error {
289289
var currentSyncFlowNum atomic.Int32
@@ -414,7 +414,7 @@ func (a *FlowableActivity) SyncFlow(
414414

415415
func (a *FlowableActivity) syncRecords(
416416
ctx context.Context,
417-
config *protos.FlowConnectionConfigs,
417+
config *protos.FlowConnectionConfigsCore,
418418
options *protos.SyncFlowOptions,
419419
srcConn connectors.CDCPullConnector,
420420
normRequests *concurrency.LastChan,
@@ -461,7 +461,7 @@ func (a *FlowableActivity) syncRecords(
461461

462462
func (a *FlowableActivity) syncPg(
463463
ctx context.Context,
464-
config *protos.FlowConnectionConfigs,
464+
config *protos.FlowConnectionConfigsCore,
465465
options *protos.SyncFlowOptions,
466466
srcConn connectors.CDCPullPgConnector,
467467
normRequests *concurrency.LastChan,
@@ -905,14 +905,14 @@ func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error {
905905
}
906906

907907
type flowInformation struct {
908-
config *protos.FlowConnectionConfigs
908+
config *protos.FlowConnectionConfigsCore
909909
updatedAt time.Time
910910
workflowID string
911911
}
912912

913913
type metricsFlowMetadata struct {
914914
updatedAt time.Time
915-
config *protos.FlowConnectionConfigs
915+
config *protos.FlowConnectionConfigsCore
916916
name string
917917
workflowID string
918918
sourcePeerName string
@@ -1129,7 +1129,7 @@ func (a *FlowableActivity) getFlowsForMetrics(ctx context.Context) ([]metricsFlo
11291129

11301130
infos, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (metricsFlowMetadata, error) {
11311131
f := metricsFlowMetadata{
1132-
config: &protos.FlowConnectionConfigs{},
1132+
config: &protos.FlowConnectionConfigsCore{},
11331133
}
11341134
var configProto []byte
11351135
if err := rows.Scan(
@@ -1191,7 +1191,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
11911191
return flowInformation{}, err
11921192
}
11931193

1194-
var config protos.FlowConnectionConfigs
1194+
var config protos.FlowConnectionConfigsCore
11951195
if err := proto.Unmarshal(configProto, &config); err != nil {
11961196
return flowInformation{}, err
11971197
}
@@ -1543,7 +1543,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
15431543
}
15441544
}
15451545

1546-
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
1546+
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore,
15471547
additionalTableMappings []*protos.TableMapping,
15481548
) error {
15491549
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
@@ -1571,7 +1571,7 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot
15711571

15721572
func (a *FlowableActivity) RemoveTablesFromPublication(
15731573
ctx context.Context,
1574-
cfg *protos.FlowConnectionConfigs,
1574+
cfg *protos.FlowConnectionConfigsCore,
15751575
removedTablesMapping []*protos.TableMapping,
15761576
) error {
15771577
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
@@ -1599,7 +1599,7 @@ func (a *FlowableActivity) RemoveTablesFromPublication(
15991599

16001600
func (a *FlowableActivity) RemoveTablesFromRawTable(
16011601
ctx context.Context,
1602-
cfg *protos.FlowConnectionConfigs,
1602+
cfg *protos.FlowConnectionConfigsCore,
16031603
tablesToRemove []*protos.TableMapping,
16041604
) error {
16051605
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
@@ -1647,7 +1647,7 @@ func (a *FlowableActivity) RemoveTablesFromRawTable(
16471647

16481648
func (a *FlowableActivity) RemoveTablesFromCatalog(
16491649
ctx context.Context,
1650-
cfg *protos.FlowConnectionConfigs,
1650+
cfg *protos.FlowConnectionConfigsCore,
16511651
tablesToRemove []*protos.TableMapping,
16521652
) error {
16531653
removedTables := make([]string, 0, len(tablesToRemove))
@@ -1768,7 +1768,7 @@ func (a *FlowableActivity) GetFlowMetadata(
17681768
}, nil
17691769
}
17701770

1771-
func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
1771+
func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error {
17721772
return internal.UpdateCDCConfigInCatalog(ctx, a.CatalogPool, internal.LoggerFromCtx(ctx), cfg)
17731773
}
17741774

flow/activities/flowable_core.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa
7979

8080
func (a *FlowableActivity) applySchemaDeltas(
8181
ctx context.Context,
82-
config *protos.FlowConnectionConfigs,
82+
config *protos.FlowConnectionConfigsCore,
8383
options *protos.SyncFlowOptions,
8484
schemaDeltas []*protos.TableSchemaDelta,
8585
) error {
@@ -111,7 +111,7 @@ func (a *FlowableActivity) applySchemaDeltas(
111111
func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items](
112112
ctx context.Context,
113113
a *FlowableActivity,
114-
config *protos.FlowConnectionConfigs,
114+
config *protos.FlowConnectionConfigsCore,
115115
options *protos.SyncFlowOptions,
116116
srcConn TPull,
117117
normRequests *concurrency.LastChan,
@@ -624,7 +624,7 @@ func (a *FlowableActivity) maintainReplConn(
624624

625625
func (a *FlowableActivity) startNormalize(
626626
ctx context.Context,
627-
config *protos.FlowConnectionConfigs,
627+
config *protos.FlowConnectionConfigsCore,
628628
batchID int64,
629629
normalizeResponses *concurrency.LastChan,
630630
) error {
@@ -683,7 +683,7 @@ func (a *FlowableActivity) startNormalize(
683683
func (a *FlowableActivity) normalizeLoop(
684684
ctx context.Context,
685685
logger log.Logger,
686-
config *protos.FlowConnectionConfigs,
686+
config *protos.FlowConnectionConfigsCore,
687687
syncDone <-chan struct{},
688688
normalizeRequests *concurrency.LastChan,
689689
normalizeResponses *concurrency.LastChan,

flow/activities/snapshot_activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (a *SnapshotActivity) GetPeerType(ctx context.Context, name string) (protos
172172

173173
func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(
174174
ctx context.Context,
175-
input *protos.FlowConnectionConfigs,
175+
input *protos.FlowConnectionConfigsCore,
176176
) (*protos.GetDefaultPartitionKeyForTablesOutput, error) {
177177
connector, err := connectors.GetByNameAs[connectors.QRepPullConnectorCore](ctx, nil, a.CatalogPool, input.SourceName)
178178
if err != nil {

flow/cmd/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"google.golang.org/grpc/health/grpc_health_v1"
2424
"google.golang.org/grpc/reflection"
2525

26+
"github.com/PeerDB-io/peerdb/flow/generated/grpc_handler"
2627
"github.com/PeerDB-io/peerdb/flow/generated/protos"
2728
"github.com/PeerDB-io/peerdb/flow/internal"
2829
"github.com/PeerDB-io/peerdb/flow/middleware"
@@ -259,7 +260,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
259260
return fmt.Errorf("unable to start scheduler workflow: %w", err)
260261
}
261262

262-
protos.RegisterFlowServiceServer(grpcServer, NewFlowServiceAdapter(flowHandler))
263+
protos.RegisterFlowServiceServer(grpcServer, grpc_handler.NewFlowServiceAdapter(flowHandler))
263264
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer())
264265
reflection.Register(grpcServer)
265266

flow/cmd/api_error.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,13 @@ import (
88
"google.golang.org/grpc/codes"
99
"google.golang.org/grpc/status"
1010
"google.golang.org/protobuf/protoadapt"
11+
12+
"github.com/PeerDB-io/peerdb/flow/generated/grpc_handler"
1113
)
1214

1315
// APIError is a strongly-typed error that must be a gRPC status error.
1416
// All handler methods should return this type instead of the generic error interface.
15-
type APIError interface {
16-
error
17-
GRPCStatus() *status.Status
18-
Code() codes.Code
19-
}
17+
type APIError = grpc_handler.APIError
2018

2119
type apiError struct {
2220
status *status.Status

0 commit comments

Comments
 (0)