Skip to content

Conversation

alexstoick
Copy link
Contributor

This uses a new helper in internal.FetchConfigFromDB to fetch a fully hydrated protos.FlowConnectionConfigs. When passing this config to Temporal we strip the tableMappings array element which can cause it to go over the 2MB limit.

This contains a subset of the changes which were originally proposed in: #3407

This uses a new helper in `internal.FetchConfigFromDB` to fetch a fully
hydrated `protos.FlowConnectionConfigs`. When passing this config to
Temporal we strip the `tableMappings` array element which can cause it
to go over the 2MB limit.

This contains a subset of the changes which were originally proposed in:
PeerDB-io#3407
@ilidemi
Copy link
Contributor

ilidemi commented Oct 11, 2025

Some ideas on how to tighten this up:

  • Remove the field from FlowConnectionConfigs (mark as reserved) to rip off the band-aid and have the type system working with us. For table additions pass the mappings as a separate arg.
  • Add a table_mappings table to catalog, write there on flow creation in handler.go, add a migration after pause like in cdc_flow.go lines 375-390 from Remove TableNameSchemaMapping from temporal state #2090 which pulled off a similar thing. Instead of fetching config from DB, fetch table mappings from DB.
  • Preserve replayability, which can be done by adding aversion bigint field that increments within a flow, referencing the value from FlowConnectionConfigs and making the mappings table be append-only.

Would be nice to consider how to not to trade off too much observability that Temporal is providing - right now for workflows with 1-10 tables it's convenient to see it right there in the execution. PeerDB UI and raw catalog access is not easily available in ClickPipes environment. Maybe log up to 100 tables (along with count and version) every time they're getting fetched?

@alexstoick
Copy link
Contributor Author

@ilidemi - Thanks for the feedback!

I hesitated in removing the field - as it does feel like a nuclear option - and I was worried about the migration path for running envs. I'll go ahead and implement your suggestion for the new table & taking a similar approach to #2090

In my previous PR: #3407 I was able to fully remove the options argument being passed which also reduced the size of the blob passed around - what are your thoughts on that? The options didn't serve any purpose once I was fetching the TableMappings and SrcTableIdNameMapping from the DB.


For adding/removing table mappings - I think we can either do:

  • a different activity
  • pass as an additional param

My only concern with changing the signature of this is again dealing with running systems and queued jobs - not sure how well Temporal behaves when you change the signature of the job!

Copy link
Contributor

@ilidemi ilidemi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options argument - moving SrcTableIdNameMapping to DB too makes sense to me, and use a version instead (or -1/nil). Rest of fields change with config/state separation, and although config is not fully immutable, merging the two would require more thinking (if it's a good idea) and would be too much scope for this PR. If it's easy enough and useful to do, we can do it separately.

Changing activity signatures - the only fully supported upgrade route for PeerDB is to pause CDC mirrors (and wait for setup/snapshot/drop to complete), then upgrade, then resume. It is nice when a surgical change is possible for running snapshots but we're not shooting for that even with smaller changes. We can just treat them as functions (besides the codepath from the start of CDCFlowWorkflow to the pause loop) and not worry about the rest. Adding an argument is good.

@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS table_mappings (
flow_name varchar(255) not null,Expand commentComment on line R2ResolvedCode has comments. Press enter to view.
version int32 not null default 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bigint, in case someone spams the API endpoint for updating tables so we don't even have to think through overflows

@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS table_mappings (
flow_name varchar(255) not null,Expand commentComment on line R2ResolvedCode has comments. Press enter to view.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copypaste quirk?

Comment on lines +308 to +310
// TODO: thought - maybe we need to pass additionalTables and then in the
// CDCWorkflow we persist them to the DB and send an incremented `tableMappingVersion`
// to the new workflow?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allowing up to ~10K additional tables at a time seems like a generous restriction. Although now it does make sense to have API explicitly turn down config update requests that Temporal won't take

ilidemi added a commit that referenced this pull request Oct 16, 2025
In #3589 we want to stop using a field in a way that's not error-prone
in the rest of the code (so, removing) but also doesn't break API
backcompat (so, not removing).

Trying to split FlowConnectionConfigs into two contracts that are made
sure to be in sync with a little codegen. Generated code has small TODOs
that would be removed by #3589 if we go this route.

Validated that renaming the type of a workflow arg works well with
history replay, only the payload is important.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants