diff --git a/docker-compose.yml b/docker-compose.yml index 99cdec4d0f..f6e1b5b20c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -114,7 +114,12 @@ services: flow-api: container_name: flow_api - image: ghcr.io/peerdb-io/flow-api:stable-v0.35.5 + build: + context: . + dockerfile: stacks/flow.Dockerfile + target: flow-api + args: + PEERDB_VERSION_SHA_SHORT: ${PEERDB_VERSION_SHA_SHORT:-} restart: unless-stopped ports: - 8112:8112 @@ -130,7 +135,10 @@ services: flow-snapshot-worker: container_name: flow-snapshot-worker - image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.35.5 + build: + context: . + dockerfile: stacks/flow.Dockerfile + target: flow-snapshot-worker restart: unless-stopped environment: <<: [*catalog-config, *flow-worker-env, *minio-config] @@ -140,7 +148,10 @@ services: flow-worker: container_name: flow-worker - image: ghcr.io/peerdb-io/flow-worker:stable-v0.35.5 + build: + context: . + dockerfile: stacks/flow.Dockerfile + target: flow-worker restart: unless-stopped environment: <<: [*catalog-config, *flow-worker-env, *minio-config] diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index afd6779d77..291e8f8dac 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "log/slog" + "maps" "net" "os" + "slices" "strconv" "sync/atomic" "time" @@ -125,6 +127,18 @@ func (a *FlowableActivity) EnsurePullability( } defer srcClose(ctx) + // We can fetch from the DB, as we are in the activity + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } + + config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(tableMappings, cfg.Resync))) + output, err := srcConn.EnsurePullability(ctx, config) if err != nil { return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err)) @@ -174,11 +188,19 @@ func (a *FlowableActivity) SetupTableSchema( } defer srcClose(ctx) - tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName) + if err != nil { + return err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings) if err != nil { return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) } - processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger) + processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger) tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -243,9 +265,17 @@ func (a *FlowableActivity) CreateNormalizedTable( return nil, err } + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } numTablesToSetup.Store(int32(len(tableNameSchemaMapping))) tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for _, tableMapping := range config.TableMappings { + for _, tableMapping := range tableMappings { tableIdentifier := tableMapping.DestinationTableIdentifier tableSchema := tableNameSchemaMapping[tableIdentifier] existing, err := conn.SetupNormalizedTable( @@ -292,6 +322,21 @@ func (a *FlowableActivity) SyncFlow( var normalizeWaiting atomic.Bool var syncingBatchID atomic.Int64 var syncState atomic.Pointer[string] + + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName) + if err != nil { + return err + } + + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + + // Override config with DB values to deal with the large fields. + config = cfg + options.TableMappings = tableMappings + syncState.Store(shared.Ptr("setup")) shutdown := heartbeatRoutine(ctx, func() string { // Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch @@ -1061,7 +1106,8 @@ func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error { if isActive { activeFlows = append(activeFlows, info) } - a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings))) + //TODO: this will need a special query as we can extract this straight from the DB. + //a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings))) a.OtelManager.Metrics.FlowStatusGauge.Record(ctx, 1, metric.WithAttributeSet(attribute.NewSet( attribute.String(otel_metrics.FlowStatusKey, info.status.String()), attribute.Bool(otel_metrics.IsFlowActiveKey, isActive), @@ -1880,3 +1926,30 @@ func (a *FlowableActivity) MigratePostgresTableOIDs( return nil }) } + +func (a *FlowableActivity) MigrateTableMappingsToCatalog( + ctx context.Context, + flowJobName string, tableMappings []*protos.TableMapping, version uint32, +) error { + logger := internal.LoggerFromCtx(ctx) + tx, err := a.CatalogPool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction to migrate table mappings to catalog: %w", err) + } + defer shared.RollbackTx(tx, logger) + + tableMappingsBytes, err := internal.TableMappingsToBytes(tableMappings) + if err != nil { + return fmt.Errorf("unable to marshal table mappings: %w", err) + } + + stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3) + ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping` + _, err = tx.Exec(ctx, stmt, flowJobName, version, tableMappingsBytes) + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction to migrate table mappings to catalog: %w", err) + } + + return nil +} diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 2da7e035a0..b5b80719c3 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -55,6 +55,33 @@ func heartbeatRoutine( ) } +//func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) { +//rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version) +//if err != nil { +//return nil, err +//} + +//var tableMappingsBytes [][]byte +//var tableMappings []*protos.TableMapping = []*protos.TableMapping{} + +//err = rows.Scan([]any{&tableMappingsBytes}) +//if err != nil { +//return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) +//} + +//for _, tableMappingBytes := range tableMappingsBytes { + +//var tableMapping *protos.TableMapping +//if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil { +//return nil, err +//} + +//tableMappings = append(tableMappings, tableMapping) +//} + +//return tableMappings, nil +//} + func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) { rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName) if err != nil { @@ -649,13 +676,18 @@ func (a *FlowableActivity) startNormalize( return fmt.Errorf("failed to get table name schema mapping: %w", err) } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion) + if err != nil { + return err + } + for { logger.Info("normalizing batches", slog.Int64("syncBatchID", batchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ FlowJobName: config.FlowJobName, Env: config.Env, TableNameSchemaMapping: tableNameSchemaMapping, - TableMappings: config.TableMappings, + TableMappings: tableMappings, SoftDeleteColName: config.SoftDeleteColName, SyncedAtColName: config.SyncedAtColName, SyncBatchID: batchID, diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 39975b8f14..52a46a467d 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -66,6 +66,18 @@ func (a *SnapshotActivity) SetupReplication( return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) } + configCtx := context.Background() + defer configCtx.Done() + cfg, err := internal.FetchConfigFromDB(configCtx, a.CatalogPool, config.FlowJobName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } + config.TableNameMapping = internal.TableNameMapping(tableMappings, cfg.Resync) + logger.Info("waiting for slot to be created...") slotInfo, err := conn.SetupReplication(ctx, config) @@ -182,8 +194,16 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables( } defer connClose(ctx) + cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, input.FlowJobName) + if err != nil { + return nil, err + } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, err + } output, err := conn.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{ - TableMappings: input.TableMappings, + TableMappings: tableMappings, }) if err != nil { return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err)) diff --git a/flow/cmd/codegen/flow_config_converter.go b/flow/cmd/codegen/flow_config_converter.go index 68a9881cc5..22fdcd99d6 100644 --- a/flow/cmd/codegen/flow_config_converter.go +++ b/flow/cmd/codegen/flow_config_converter.go @@ -24,7 +24,7 @@ import ( ) // FlowConnectionConfigsToCore converts API FlowConnectionConfigs to internal FlowConnectionConfigsCore -func FlowConnectionConfigsToCore(api *protos.FlowConnectionConfigs, tableMappingsVersion int64) *protos.FlowConnectionConfigsCore { +func FlowConnectionConfigsToCore(api *protos.FlowConnectionConfigs) *protos.FlowConnectionConfigsCore { if api == nil { return nil } @@ -32,8 +32,6 @@ func FlowConnectionConfigsToCore(api *protos.FlowConnectionConfigs, tableMapping return &protos.FlowConnectionConfigsCore{ {{range .Fields}} {{.GoName}}: api.{{.GoName}}, {{end}} - TableMappings: api.TableMappings, // TODO: remove - // TableMappingsVersion: tableMappingsVersion, // TODO: uncomment } } @@ -49,7 +47,6 @@ func FlowConnectionConfigsCoreToAPI( return &protos.FlowConnectionConfigs{ {{range .Fields}} {{.GoName}}: core.{{.GoName}}, {{end}} - TableMappings: tableMappings, // TODO: remove } } ` diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 8a255ea5cc..e77c970eef 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -177,8 +177,23 @@ func (h *FlowRequestHandler) CreateCDCFlow( } // No running workflow, do the validations and start a new one + // Insert the table mappings into the DB + tableMappingsBytes, err := internal.TableMappingsToBytes(req.ConnectionConfigs.TableMappings) + if err != nil { + return nil, NewInternalApiError(fmt.Errorf("unable to marshal table mappings: %w", err)) + } + + stmt := `INSERT INTO table_mappings (flow_name, version, table_mappings) VALUES ($1, $2, $3) + ON CONFLICT (flow_name, version) DO UPDATE SET table_mappings = EXCLUDED.table_mappings` + slog.Info("YOOOO QUERY ", slog.String("stmt", stmt)) + version := 0 + _, err = h.pool.Exec(ctx, stmt, cfg.FlowJobName, version, tableMappingsBytes) + if err != nil { + return nil, NewInternalApiError(fmt.Errorf("unable to insert table mappings: %w", err)) + } + // Use idempotent validation that skips mirror existence check - connectionConfigsCore := pconv.FlowConnectionConfigsToCore(req.ConnectionConfigs, 0) + connectionConfigsCore := pconv.FlowConnectionConfigsToCore(req.ConnectionConfigs) if _, err := h.validateCDCMirrorImpl(ctx, connectionConfigsCore, true); err != nil { slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err)) return nil, NewInternalApiError(fmt.Errorf("invalid mirror: %w", err)) @@ -297,7 +312,7 @@ func (h *FlowRequestHandler) dropFlow( if dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ FlowJobName: flowJobName, DropFlowStats: deleteStats, - FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig, 0), + FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig), SkipDestinationDrop: true, SkipSourceDrop: true, }); err != nil { @@ -355,7 +370,7 @@ func (h *FlowRequestHandler) shutdownFlow( dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ FlowJobName: flowJobName, DropFlowStats: deleteStats, - FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig, 0), + FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig), SkipDestinationDrop: skipDestinationDrop, // NOTE: Resync is false here during snapshot-only resync }) @@ -457,12 +472,17 @@ func (h *FlowRequestHandler) FlowStateChange( if err != nil { return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err)) } + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion) + if err != nil { + return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err)) + } config.Resync = true config.DoInitialSnapshot = true // validate mirror first because once the mirror is dropped, there's no going back if _, err := h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{ ConnectionConfigs: config, + TableMappings: tableMappings, }); err != nil { return nil, NewFailedPreconditionApiError(fmt.Errorf("invalid mirror: %w", err)) } @@ -630,7 +650,7 @@ func (h *FlowRequestHandler) resyncCompletedSnapshot( } workflowID := getWorkflowID(config.FlowJobName) - configCore := pconv.FlowConnectionConfigsToCore(config, 0) + configCore := pconv.FlowConnectionConfigsToCore(config) if _, err := h.createCDCFlow(ctx, configCore, workflowID); err != nil { return err } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 58069a98c0..368eb10718 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -163,7 +163,8 @@ func (h *FlowRequestHandler) cdcFlowStatus( if state.SyncFlowOptions != nil { config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds config.MaxBatchSize = state.SyncFlowOptions.BatchSize - config.TableMappings = state.SyncFlowOptions.TableMappings + //TODO: this will need to fetch from the DB? + //config.TableMappings = state.SyncFlowOptions.TableMappings } srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName) diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 8e612ba543..f2f9fdc1c6 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -19,7 +19,7 @@ var CustomColumnTypeRegex = regexp.MustCompile(`^$|^[a-zA-Z][a-zA-Z0-9(),]*$`) func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.ValidateCDCMirrorResponse, APIError) { - flowConnectionConfigsCore := proto_conversions.FlowConnectionConfigsToCore(req.ConnectionConfigs, 0) + flowConnectionConfigsCore := proto_conversions.FlowConnectionConfigsToCore(req.ConnectionConfigs) return h.validateCDCMirrorImpl(ctx, flowConnectionConfigsCore, false) } @@ -65,7 +65,15 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl( errors.New("invalid config: initial_snapshot_only is true but do_initial_snapshot is false")) } - for _, tm := range connectionConfigs.TableMappings { + // fetch connection configs from DB + slog.Info("!!!!! YO") + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, connectionConfigs.FlowJobName, connectionConfigs.TableMappingVersion) + slog.Info("!!!!! YO2 ") + if err != nil { + return nil, NewInternalApiError(err) + } + + for _, tm := range tableMappings { for _, col := range tm.Columns { if !CustomColumnTypeRegex.MatchString(col.DestinationType) { return nil, NewInvalidArgumentApiError(errors.New("invalid custom column type " + col.DestinationType)) @@ -104,7 +112,7 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl( if !connectionConfigs.Resync { var getTableSchemaError error tableSchemaMap, getTableSchemaError = srcConn.GetTableSchema(ctx, connectionConfigs.Env, connectionConfigs.Version, - connectionConfigs.System, connectionConfigs.TableMappings) + connectionConfigs.System, tableMappings) if getTableSchemaError != nil { return nil, NewFailedPreconditionApiError(fmt.Errorf("failed to get source table schema: %w", getTableSchemaError)) } diff --git a/flow/connectors/bigquery/qrep_object_pull.go b/flow/connectors/bigquery/qrep_object_pull.go index 0aab08c1d5..e9bcecb5d0 100644 --- a/flow/connectors/bigquery/qrep_object_pull.go +++ b/flow/connectors/bigquery/qrep_object_pull.go @@ -319,8 +319,13 @@ func (c *BigQueryConnector) ExportTxSnapshot( return nil, nil, fmt.Errorf("failed to fetch flow config from db: %w", err) } - jobs := make([]*bigquery.Job, 0, len(cfg.TableMappings)) - for _, tm := range cfg.TableMappings { + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return nil, nil, fmt.Errorf("failed to fetch table mappings: %w", err) + } + + jobs := make([]*bigquery.Job, 0, len(tableMappings)) + for _, tm := range tableMappings { uri := fmt.Sprintf("%s/%s/*.avro", cfg.SnapshotStagingPath, url.PathEscape(tm.SourceTableIdentifier)) gcsRef := bigquery.NewGCSReference(uri) gcsRef.DestinationFormat = bigquery.Avro diff --git a/flow/connectors/bigquery/source.go b/flow/connectors/bigquery/source.go index c435033710..5edfb835b0 100644 --- a/flow/connectors/bigquery/source.go +++ b/flow/connectors/bigquery/source.go @@ -9,6 +9,7 @@ import ( "google.golang.org/api/iterator" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" ) func (c *BigQueryConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error { @@ -16,7 +17,12 @@ func (c *BigQueryConnector) ValidateMirrorSource(ctx context.Context, cfg *proto return errors.New("BigQuery source connector only supports initial snapshot flows. CDC is not supported") } - for _, tableMapping := range cfg.TableMappings { + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return fmt.Errorf("failed to fetch table mappings: %w", err) + } + + for _, tableMapping := range tableMappings { dstDatasetTable, err := c.convertToDatasetTable(tableMapping.SourceTableIdentifier) if err != nil { return err diff --git a/flow/connectors/clickhouse/validate.go b/flow/connectors/clickhouse/validate.go index e527a23bdd..ef8974c6f4 100644 --- a/flow/connectors/clickhouse/validate.go +++ b/flow/connectors/clickhouse/validate.go @@ -17,6 +17,10 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( cfg *protos.FlowConnectionConfigsCore, tableNameSchemaMapping map[string]*protos.TableSchema, ) error { + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return fmt.Errorf("failed to fetch table mappings: %w", err) + } if internal.PeerDBOnlyClickHouseAllowed() { err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database) if err != nil { @@ -34,7 +38,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( } // this is for handling column exclusion, processed schema does that in a step - processedMapping := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, c.logger) + processedMapping := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, c.logger) dstTableNames := slices.Collect(maps.Keys(processedMapping)) // In the case of resync, we don't need to check the content or structure of the original tables; @@ -64,7 +68,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( return err } - for _, tableMapping := range cfg.TableMappings { + for _, tableMapping := range tableMappings { dstTableName := tableMapping.DestinationTableIdentifier if _, ok := processedMapping[dstTableName]; !ok { // if destination table is not a key, that means source table was not a key in the original schema mapping(?) diff --git a/flow/connectors/mysql/validate.go b/flow/connectors/mysql/validate.go index 6cf7262012..acf33d3d54 100644 --- a/flow/connectors/mysql/validate.go +++ b/flow/connectors/mysql/validate.go @@ -10,6 +10,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/connectors/utils" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" mysql_validation "github.com/PeerDB-io/peerdb/flow/pkg/mysql" ) @@ -78,8 +79,13 @@ func (c *MySqlConnector) CheckBinlogSettings(ctx context.Context, requireRowMeta } func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error { - sourceTables := make([]*utils.SchemaTable, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return fmt.Errorf("failed to fetch table mappings: %w", err) + } + + sourceTables := make([]*utils.SchemaTable, 0, len(tableMappings)) + for _, tableMapping := range tableMappings { parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier) if parseErr != nil { return fmt.Errorf("invalid source table identifier: %w", parseErr) @@ -112,7 +118,7 @@ func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F } requireRowMetadata := false - for _, tm := range cfg.TableMappings { + for _, tm := range tableMappings { if len(tm.Exclude) > 0 { requireRowMetadata = true break diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index ca97a4031e..176f5eeebd 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -10,6 +10,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/connectors/utils" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/shared" ) @@ -227,8 +228,13 @@ func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *proto } } - sourceTables := make([]*utils.SchemaTable, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { + tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return fmt.Errorf("failed to fetch table mappings: %w", err) + } + + sourceTables := make([]*utils.SchemaTable, 0, len(tableMappings)) + for _, tableMapping := range tableMappings { parsedTable, parseErr := utils.ParseSchemaTable(tableMapping.SourceTableIdentifier) if parseErr != nil { return fmt.Errorf("invalid source table identifier: %w", parseErr) @@ -249,7 +255,7 @@ func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *proto } } - if err := c.CheckSourceTables(ctx, sourceTables, cfg.TableMappings, pubName, noCDC); err != nil { + if err := c.CheckSourceTables(ctx, sourceTables, tableMappings, pubName, noCDC); err != nil { return fmt.Errorf("provided source tables invalidated: %w", err) } diff --git a/flow/e2e/clickhouse_test.go b/flow/e2e/clickhouse_test.go index 08a3241623..ff7d5aced4 100644 --- a/flow/e2e/clickhouse_test.go +++ b/flow/e2e/clickhouse_test.go @@ -501,7 +501,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) { env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) - env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, 0) + env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, nil) EnvWaitForFinished(s.t, env, 3*time.Minute) // now test weird names with rename based resync @@ -520,7 +520,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) { env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) - env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, 0) + env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, nil) EnvWaitForFinished(s.t, env, 3*time.Minute) // now test weird names with exchange based resync ch, err = connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) @@ -2450,7 +2450,7 @@ func (s ClickHouseSuite) Test_NullEngine() { env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) - env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, 0) + env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, nil) EnvWaitForFinished(s.t, env, 3*time.Minute) require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("ALTER TABLE %s DROP COLUMN val", srcFullName))) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index cc579fbc7e..049842aec2 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -72,7 +72,6 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs(s Suite) ret := &protos.FlowConnectionConfigs{ FlowJobName: c.FlowJobName, - TableMappings: tblMappings, SourceName: s.Source().GeneratePeer(t).Name, DestinationName: c.Destination, SyncedAtColName: "_PEERDB_SYNCED_AT", diff --git a/flow/e2e/postgres_test.go b/flow/e2e/postgres_test.go index 702784e018..c93b35b4f8 100644 --- a/flow/e2e/postgres_test.go +++ b/flow/e2e/postgres_test.go @@ -1276,7 +1276,7 @@ func (s PeerFlowE2ETestSuitePG) TestResync(tableName string) { env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) - env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, 0) + env = ExecuteDropFlow(s.t.Context(), tc, flowConnConfig, nil) EnvWaitForFinished(s.t, env, 3*time.Minute) flowConnConfig.Resync = true diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 104629815b..ddea924581 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -666,11 +666,11 @@ func ExecutePeerflow(t *testing.T, tc client.Client, config *protos.FlowConnecti } } -func ExecuteDropFlow(ctx context.Context, tc client.Client, config *protos.FlowConnectionConfigs, tableMappingsVersion int64) WorkflowRun { +func ExecuteDropFlow(ctx context.Context, tc client.Client, config *protos.FlowConnectionConfigs, tableMappings []*protos.TableMapping) WorkflowRun { return ExecuteWorkflow(ctx, tc, shared.PeerFlowTaskQueue, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ FlowJobName: config.FlowJobName, DropFlowStats: false, - FlowConnectionConfigs: proto_conversions.FlowConnectionConfigsToCore(config, tableMappingsVersion), + FlowConnectionConfigs: proto_conversions.FlowConnectionConfigsToCore(config), }) } diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go index ee1b331da0..fc66515d09 100644 --- a/flow/internal/flow_configuration_helpers.go +++ b/flow/internal/flow_configuration_helpers.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "log/slog" "google.golang.org/protobuf/proto" @@ -11,6 +12,22 @@ import ( "github.com/PeerDB-io/peerdb/flow/shared" ) +func TableNameMapping(tableMappings []*protos.TableMapping, resync bool) map[string]string { + tblNameMapping := make(map[string]string, len(tableMappings)) + if resync { + for _, mapping := range tableMappings { + if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { + mapping.DestinationTableIdentifier += "_resync" + } + } + } + for _, v := range tableMappings { + tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier + } + + return tblNameMapping +} + func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error) { var configBytes sql.RawBytes if err := catalogPool.QueryRow(ctx, @@ -26,3 +43,52 @@ func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flow return &cfgFromDB, nil } + +func TableMappingsToBytes(tableMappings []*protos.TableMapping) ([][]byte, error) { + tableMappingsBytes := [][]byte{} + for _, tableMapping := range tableMappings { + tableMappingBytes, err := proto.Marshal(tableMapping) + if err != nil { + return nil, fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err) + } + tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes) + } + return tableMappingsBytes, nil +} + +func FetchTableMappingsFromDB(ctx context.Context, flowJobName string, version uint32) ([]*protos.TableMapping, error) { + pool, err := GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + return nil, err + } + + row := pool.QueryRow(ctx, + `select table_mappings + from table_mappings + where flow_name = $1 + and version = $2`, + flowJobName, version, + ) + + slog.Info("!!!!! stmt", slog.String("flow_name", flowJobName), slog.Int("version", int(version))) + var tableMappingsBytes [][]byte + if err := row.Scan(&tableMappingsBytes); err != nil { + return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err) + } + + //var tableMappings []*protos.TableMapping + + var tableMappings []*protos.TableMapping = []*protos.TableMapping{} + slog.Info("!!!!! YO3 ", slog.Int("len", len(tableMappingsBytes))) + + for _, tableMappingBytes := range tableMappingsBytes { + var tableMapping protos.TableMapping + slog.Info("!!!!! YO - in loop ") + if err := proto.Unmarshal(tableMappingBytes, &tableMapping); err != nil { + return nil, err + } + + tableMappings = append(tableMappings, &tableMapping) + } + return tableMappings, nil +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2a0ab794fb..482031fdb3 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -45,10 +45,6 @@ type CDCFlowWorkflowState struct { // returns a new empty PeerFlowState func NewCDCFlowWorkflowState(ctx workflow.Context, logger log.Logger, cfg *protos.FlowConnectionConfigsCore) *CDCFlowWorkflowState { - tableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { - tableMappings = append(tableMappings, proto.CloneOf(tableMapping)) - } state := CDCFlowWorkflowState{ ActiveSignal: model.NoopSignal, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, @@ -56,7 +52,6 @@ func NewCDCFlowWorkflowState(ctx workflow.Context, logger log.Logger, cfg *proto SyncFlowOptions: &protos.SyncFlowOptions{ BatchSize: cfg.MaxBatchSize, IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, - TableMappings: tableMappings, }, SnapshotNumRowsPerPartition: cfg.SnapshotNumRowsPerPartition, SnapshotNumPartitionsOverride: cfg.SnapshotNumPartitionsOverride, @@ -123,7 +118,7 @@ func updateFlowConfigWithLatestSettings( cloneCfg := proto.CloneOf(cfg) cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds - cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings + cloneCfg.TableMappingVersion = state.SyncFlowOptions.TableMappingVersion if state.SnapshotNumRowsPerPartition > 0 { cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition } @@ -221,6 +216,24 @@ func processCDCFlowConfigUpdate( } } + // MIGRATION: Move `tableMapping` to the DB + if len(state.SyncFlowOptions.TableMappings) > 0 { + migrateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + if err := workflow.ExecuteActivity( + migrateCtx, + flowable.MigrateTableMappingsToCatalog, + cfg.FlowJobName, + state.SyncFlowOptions.TableMappings, + 1, //hardcoding version as 1 for migration purposes. + ).Get(migrateCtx, nil); err != nil { + return fmt.Errorf("failed to migrate TableMappings: %w", err) + } + state.SyncFlowOptions.TableMappings = nil + state.SyncFlowOptions.TableMappingVersion = 1 + } + syncStateToConfigProtoInCatalog(ctx, cfg, state) return nil } @@ -309,7 +322,11 @@ func processTableAdditions( additionalTablesCfg := proto.CloneOf(cfg) additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions additionalTablesCfg.InitialSnapshotOnly = true - additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + // TODO: thought - maybe we need to pass additionalTables and then in the + // CDCWorkflow we persist them to the DB and send an incremented `tableMappingVersion` + // to the new workflow? + //additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + additionalTablesCfg.Resync = false if state.SnapshotNumRowsPerPartition > 0 { additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition @@ -643,22 +660,6 @@ func CDCFlowWorkflow( // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. if state.CurrentFlowStatus != protos.FlowStatus_STATUS_RUNNING { - originalTableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { - originalTableMappings = append(originalTableMappings, proto.CloneOf(tableMapping)) - } - // if resync is true, alter the table name schema mapping to temporarily add - // a suffix to the table names. - if cfg.Resync { - for _, mapping := range state.SyncFlowOptions.TableMappings { - if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { - mapping.DestinationTableIdentifier += "_resync" - } - } - // because we have renamed the tables. - cfg.TableMappings = state.SyncFlowOptions.TableMappings - } - // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID) @@ -682,7 +683,6 @@ func CDCFlowWorkflow( state.ActiveSignal = model.ResyncSignal cfg.Resync = true cfg.DoInitialSnapshot = true - cfg.TableMappings = originalTableMappings // this is the only place where we can have a resync during a resync // so we need to NOT sync the tableMappings to catalog to preserve original names uploadConfigToCatalog(ctx, cfg) @@ -706,6 +706,9 @@ func CDCFlowWorkflow( WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) + // Resync will rely rely on the `cfg.Resync` flag to rename the tables + // during the snapshot process. This is how we're able to also remove the need + // to sync the config back into the DB / not rely on the `state.TableMappings`. setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) var setupFlowOutput *protos.SetupFlowOutput @@ -853,6 +856,7 @@ func CDCFlowWorkflow( WaitForCancellation: true, RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, })) + state.SyncFlowOptions.TableMappings = []*protos.TableMapping{} syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f4c79eb67c..25589bdcb0 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -36,15 +36,16 @@ type SetupFlowExecution struct { tableNameMapping map[string]string cdcFlowName string executionID string + resync bool } // NewSetupFlowExecution creates a new instance of SetupFlowExecution. -func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution { +func NewSetupFlowExecution(ctx workflow.Context, cdcFlowName string, resync bool) *SetupFlowExecution { return &SetupFlowExecution{ - Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), - tableNameMapping: tableNameMapping, - cdcFlowName: cdcFlowName, - executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), + cdcFlowName: cdcFlowName, + executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + resync: resync, } } @@ -119,10 +120,9 @@ func (s *SetupFlowExecution) ensurePullability( // create EnsurePullabilityInput for the srcTableName ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ - PeerName: config.SourceName, - FlowJobName: s.cdcFlowName, - SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)), - CheckConstraints: checkConstraints, + PeerName: config.SourceName, + FlowJobName: s.cdcFlowName, + CheckConstraints: checkConstraints, } future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput) @@ -162,9 +162,8 @@ func (s *SetupFlowExecution) createRawTable( // attempt to create the tables. createRawTblInput := &protos.CreateRawTableInput{ - PeerName: config.DestinationName, - FlowJobName: s.cdcFlowName, - TableNameMapping: s.tableNameMapping, + PeerName: config.DestinationName, + FlowJobName: s.cdcFlowName, } rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput) @@ -191,12 +190,11 @@ func (s *SetupFlowExecution) setupNormalizedTables( }) tableSchemaInput := &protos.SetupTableSchemaBatchInput{ - PeerName: flowConnectionConfigs.SourceName, - TableMappings: flowConnectionConfigs.TableMappings, - FlowName: s.cdcFlowName, - System: flowConnectionConfigs.System, - Env: flowConnectionConfigs.Env, - Version: flowConnectionConfigs.Version, + PeerName: flowConnectionConfigs.SourceName, + FlowName: s.cdcFlowName, + System: flowConnectionConfigs.System, + Env: flowConnectionConfigs.Env, + Version: flowConnectionConfigs.Version, } if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil { @@ -207,7 +205,6 @@ func (s *SetupFlowExecution) setupNormalizedTables( s.Info("setting up normalized tables on destination peer", slog.String("destination", flowConnectionConfigs.DestinationName)) setupConfig := &protos.SetupNormalizedTableBatchInput{ PeerName: flowConnectionConfigs.DestinationName, - TableMappings: flowConnectionConfigs.TableMappings, SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName, SyncedAtColName: flowConnectionConfigs.SyncedAtColName, FlowName: flowConnectionConfigs.FlowJobName, @@ -261,13 +258,8 @@ func (s *SetupFlowExecution) executeSetupFlow( // SetupFlowWorkflow is the workflow that sets up the flow. func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigsCore) (*protos.SetupFlowOutput, error) { - tblNameMapping := make(map[string]string, len(config.TableMappings)) - for _, v := range config.TableMappings { - tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier - } - // create the setup flow execution - setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName) + setupFlowExecution := NewSetupFlowExecution(ctx, config.FlowJobName, config.Resync) // execute the setup flow setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 1f17c42595..7f80ceabfa 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -1,6 +1,7 @@ package peerflow import ( + "context" "fmt" "log/slog" "slices" @@ -55,15 +56,10 @@ func (s *SnapshotFlowExecution) setupReplication( }, }) - tblNameMapping := make(map[string]string, len(s.config.TableMappings)) - for _, v := range s.config.TableMappings { - tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier - } - setupReplicationInput := &protos.SetupReplicationInput{ - PeerName: s.config.SourceName, - FlowJobName: flowName, - TableNameMapping: tblNameMapping, + PeerName: s.config.SourceName, + FlowJobName: flowName, + // TableNameMapping: tblNameMapping, DoInitialSnapshot: s.config.DoInitialSnapshot, ExistingPublicationName: s.config.PublicationName, ExistingReplicationSlotName: s.config.ReplicationSlotName, @@ -285,7 +281,27 @@ func (s *SnapshotFlowExecution) cloneTables( return err } - for _, v := range s.config.TableMappings { + configCtx := context.Background() + defer configCtx.Done() + pool, err := internal.GetCatalogConnectionPoolFromEnv(configCtx) + if err != nil { + return err + } + defer pool.Pool.Close() + + cfg, err := internal.FetchConfigFromDB(configCtx, pool, s.config.FlowJobName) + if err != nil { + return err + } + + tableMappingsCtx := context.Background() + defer tableMappingsCtx.Done() + tableMappings, err := internal.FetchTableMappingsFromDB(tableMappingsCtx, cfg.FlowJobName, cfg.TableMappingVersion) + if err != nil { + return err + } + + for _, v := range tableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier s.logger.Info( diff --git a/nexus/catalog/migrations/V49__table_mappings_table.sql b/nexus/catalog/migrations/V49__table_mappings_table.sql new file mode 100644 index 0000000000..5a4e78cc64 --- /dev/null +++ b/nexus/catalog/migrations/V49__table_mappings_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS table_mappings ( + flow_name varchar(255) not null, + version bigint not null default 1, + table_mappings bytea[] not null, + primary key (flow_name, version) +); diff --git a/protos/flow.proto b/protos/flow.proto index 5036615daf..d8164b7040 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -41,6 +41,7 @@ message TableMapping { string sharding_key = 7; string policy_name = 8; string partition_by_expr = 9; + uint32 version = 10; } message SetupInput { @@ -91,6 +92,7 @@ message FlowConnectionConfigs { map env = 24; uint32 version = 25; + uint32 table_mapping_version = 27; } // FlowConnectionConfigsCore is used internally in the codebase, it is safe to remove (mark reserved) fields from it @@ -103,7 +105,7 @@ message FlowConnectionConfigsCore { // config for the CDC flow itself // currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals - repeated TableMapping table_mappings = 4; + //repeated TableMapping table_mappings = 4; uint32 max_batch_size = 5; uint64 idle_timeout_seconds = 6; string cdc_staging_path = 7; @@ -137,6 +139,7 @@ message FlowConnectionConfigsCore { map env = 24; uint32 version = 25; + uint32 table_mapping_version = 27; } message RenameTableOption { @@ -181,6 +184,7 @@ message SyncFlowOptions { map src_table_id_name_mapping = 4; repeated TableMapping table_mappings = 6; int32 number_of_syncs = 7; + uint32 table_mapping_version = 8; } message EnsurePullabilityBatchInput { diff --git a/protos/route.proto b/protos/route.proto index 6b77bebff8..5ec2f436a5 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -12,6 +12,7 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; bool attach_to_existing = 2; + repeated peerdb_flow.TableMapping table_mappings = 3; } message CreateCDCFlowResponse { string workflow_id = 1; }