Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"errors"
"fmt"
"log/slog"
"maps"
"net"
"os"
"slices"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -125,6 +127,18 @@ func (a *FlowableActivity) EnsurePullability(
}
defer connectors.CloseConnector(ctx, srcConn)

// We can fetch from the DB, as we are in the activity
cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName)
if err != nil {
return nil, err
}
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, cfg.TableMappingVersion)
if err != nil {
return nil, err
}

config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(tableMappings, cfg.Resync)))

output, err := srcConn.EnsurePullability(ctx, config)
if err != nil {
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err))
Expand Down Expand Up @@ -174,11 +188,19 @@ func (a *FlowableActivity) SetupTableSchema(
}
defer connectors.CloseConnector(ctx, srcConn)

tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings)
cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName)
if err != nil {
return err
}
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
if err != nil {
return err
}
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings)
if err != nil {
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err))
}
processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger)
processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger)

tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
Expand Down Expand Up @@ -243,9 +265,17 @@ func (a *FlowableActivity) CreateNormalizedTable(
return nil, err
}

cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowName)
if err != nil {
return nil, err
}
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
if err != nil {
return nil, err
}
numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
for _, tableMapping := range config.TableMappings {
for _, tableMapping := range tableMappings {
tableIdentifier := tableMapping.DestinationTableIdentifier
tableSchema := tableNameSchemaMapping[tableIdentifier]
existing, err := conn.SetupNormalizedTable(
Expand Down Expand Up @@ -292,6 +322,21 @@ func (a *FlowableActivity) SyncFlow(
var normalizeWaiting atomic.Bool
var syncingBatchID atomic.Int64
var syncState atomic.Pointer[string]

cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, config.FlowJobName)
if err != nil {
return err
}

tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
if err != nil {
return err
}

// Override config with DB values to deal with the large fields.
config = cfg
options.TableMappings = tableMappings

syncState.Store(shared.Ptr("setup"))
shutdown := heartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
Expand Down Expand Up @@ -1040,7 +1085,8 @@ func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error {
if isActive {
activeFlows = append(activeFlows, info)
}
a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings)))
//TODO: this will need a special query as we can extract this straight from the DB.
//a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings)))
a.OtelManager.Metrics.FlowStatusGauge.Record(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowStatusKey, info.status.String()),
attribute.Bool(otel_metrics.IsFlowActiveKey, isActive),
Expand Down Expand Up @@ -1789,3 +1835,30 @@ func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos
)))
return nil
}

func (a *FlowableActivity) MigrateTableMappingsToCatalog(
ctx context.Context,
flowJobName string, tableMappings []*protos.TableMapping, version uint32,
) error {
logger := internal.LoggerFromCtx(ctx)
tx, err := a.CatalogPool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction to migrate table mappings to catalog: %w", err)
}
defer shared.RollbackTx(tx, logger)

tableMappingsBytes, err := internal.TableMappingsToBytes(tableMappings)
if err != nil {
return fmt.Errorf("unable to marshal table mappings: %w", err)
}

stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)
ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping`
_, err = tx.Exec(ctx, stmt, flowJobName, version, tableMappingsBytes)

if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction to migrate table mappings to catalog: %w", err)
}

return nil
}
34 changes: 33 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,33 @@ func heartbeatRoutine(
)
}

//func (a *FlowableActivity) getTableMappings(ctx context.Context, flowName string, version uint) ([]*protos.TableMapping, error) {
//rows, err := a.CatalogPool.Query(ctx, "select table_mapping from table_mappings where flow_name = $1 AND version = $2", flowName, version)
//if err != nil {
//return nil, err
//}

//var tableMappingsBytes [][]byte
//var tableMappings []*protos.TableMapping = []*protos.TableMapping{}

//err = rows.Scan([]any{&tableMappingsBytes})
//if err != nil {
//return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
//}

//for _, tableMappingBytes := range tableMappingsBytes {

//var tableMapping *protos.TableMapping
//if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
//return nil, err
//}

//tableMappings = append(tableMappings, tableMapping)
//}

//return tableMappings, nil
//}

func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) {
rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName)
if err != nil {
Expand Down Expand Up @@ -648,13 +675,18 @@ func (a *FlowableActivity) startNormalize(
return fmt.Errorf("failed to get table name schema mapping: %w", err)
}

tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion)
if err != nil {
return err
}

for {
logger.Info("normalizing batch", slog.Int64("syncBatchID", batchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: config.FlowJobName,
Env: config.Env,
TableNameSchemaMapping: tableNameSchemaMapping,
TableMappings: config.TableMappings,
TableMappings: tableMappings,
SoftDeleteColName: config.SoftDeleteColName,
SyncedAtColName: config.SyncedAtColName,
SyncBatchID: batchID,
Expand Down
22 changes: 21 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ func (a *SnapshotActivity) SetupReplication(
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err))
}

configCtx := context.Background()
defer configCtx.Done()
cfg, err := internal.FetchConfigFromDB(configCtx, a.CatalogPool, config.FlowJobName)
if err != nil {
return nil, err
}
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
if err != nil {
return nil, err
}
config.TableNameMapping = internal.TableNameMapping(tableMappings, cfg.Resync)

logger.Info("waiting for slot to be created...")
slotInfo, err := conn.SetupReplication(ctx, config)

Expand Down Expand Up @@ -180,8 +192,16 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(
}
defer connectors.CloseConnector(ctx, connector)

cfg, err := internal.FetchConfigFromDB(ctx, a.CatalogPool, input.FlowJobName)
if err != nil {
return nil, err
}
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
if err != nil {
return nil, err
}
output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{
TableMappings: input.TableMappings,
TableMappings: tableMappings,
})
if err != nil {
return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err))
Expand Down
16 changes: 16 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
connectionConfigs.FlowJobName, err)
}

// Insert the table mappings into the DB
tableMappingsBytes, err := internal.TableMappingsToBytes(connectionConfigs.TableMappings)
if err != nil {
return fmt.Errorf("unable to marshal table mappings: %w", err)
}

stmt := `INSERT INTO table_mappings (flow_name, version, table_mapping) VALUES ($1, $2, $3)
ON CONFLICT (flow_name, version) DO UPDATE SET table_mapping = EXCLUDED.table_mapping`
version := 1
_, err = h.pool.Exec(ctx, stmt, connectionConfigs.FlowJobName, version, tableMappingsBytes)

return nil
}

Expand Down Expand Up @@ -458,12 +469,17 @@ func (h *FlowRequestHandler) FlowStateChange(
if err != nil {
return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err))
}
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion)
if err != nil {
return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err))
}

config.Resync = true
config.DoInitialSnapshot = true
// validate mirror first because once the mirror is dropped, there's no going back
if _, err := h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{
ConnectionConfigs: config,
TableMappings: tableMappings,
}); err != nil {
return nil, NewFailedPreconditionApiError(fmt.Errorf("invalid mirror: %w", err))
}
Expand Down
3 changes: 2 additions & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func (h *FlowRequestHandler) cdcFlowStatus(
if state.SyncFlowOptions != nil {
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.SyncFlowOptions.TableMappings
//TODO: this will need to fetch from the DB?
//config.TableMappings = state.SyncFlowOptions.TableMappings
}

srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/clickhouse/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
cfg *protos.FlowConnectionConfigsCore,
tableNameSchemaMapping map[string]*protos.TableSchema,
) error {
tableMappings := cfg.TableMappings
if internal.PeerDBOnlyClickHouseAllowed() {
err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database)
if err != nil {
Expand All @@ -34,7 +35,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
}

// this is for handling column exclusion, processed schema does that in a step
processedMapping := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, c.logger)
processedMapping := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, c.logger)
dstTableNames := slices.Collect(maps.Keys(processedMapping))

// In the case of resync, we don't need to check the content or structure of the original tables;
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination(
return err
}

for _, tableMapping := range cfg.TableMappings {
for _, tableMapping := range tableMappings {
dstTableName := tableMapping.DestinationTableIdentifier
if _, ok := processedMapping[dstTableName]; !ok {
// if destination table is not a key, that means source table was not a key in the original schema mapping(?)
Expand Down
1 change: 0 additions & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs(s Suite)

ret := &protos.FlowConnectionConfigs{
FlowJobName: c.FlowJobName,
TableMappings: tblMappings,
SourceName: s.Source().GeneratePeer(t).Name,
DestinationName: c.Destination,
SyncedAtColName: "_PEERDB_SYNCED_AT",
Expand Down
62 changes: 62 additions & 0 deletions flow/internal/flow_configuration_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ import (
"github.com/PeerDB-io/peerdb/flow/shared"
)

func TableNameMapping(tableMappings []*protos.TableMapping, resync bool) map[string]string {
tblNameMapping := make(map[string]string, len(tableMappings))
if resync {
for _, mapping := range tableMappings {
if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL {
mapping.DestinationTableIdentifier += "_resync"
}
}
}
for _, v := range tableMappings {
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
}

return tblNameMapping
}

func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (*protos.FlowConnectionConfigsCore, error) {
var configBytes sql.RawBytes
if err := catalogPool.QueryRow(ctx,
Expand All @@ -26,3 +42,49 @@ func FetchConfigFromDB(ctx context.Context, catalogPool shared.CatalogPool, flow

return &cfgFromDB, nil
}

func TableMappingsToBytes(tableMappings []*protos.TableMapping) ([][]byte, error) {
tableMappingsBytes := [][]byte{}
for _, tableMapping := range tableMappings {
tableMappingBytes, err := proto.Marshal(tableMapping)
if err != nil {
return nil, fmt.Errorf("failed to marshal table mapping to migrate to catalog: %w", err)
}
tableMappingsBytes = append(tableMappingsBytes, tableMappingBytes)
}
return tableMappingsBytes, nil
}

func FetchTableMappingsFromDB(ctx context.Context, flowJobName string, version uint32) ([]*protos.TableMapping, error) {
pool, err := GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
return nil, err
}
rows, err := pool.Query(ctx,
"select table_mappings from table_mappings where flow_name = $1 AND version = $2",
flowJobName, version,
)
if err != nil {
return nil, err
}

var tableMappingsBytes [][]byte
var tableMappings []*protos.TableMapping = []*protos.TableMapping{}

err = rows.Scan([]any{&tableMappingsBytes})
if err != nil {
return nil, fmt.Errorf("failed to deserialize table mapping schema proto: %w", err)
}

for _, tableMappingBytes := range tableMappingsBytes {

var tableMapping *protos.TableMapping
if err := proto.Unmarshal(tableMappingBytes, tableMapping); err != nil {
return nil, err
}

tableMappings = append(tableMappings, tableMapping)
}

return tableMappings, nil
}
Loading
Loading