-
Notifications
You must be signed in to change notification settings - Fork 139
Sync flow: use metadata for last offset fetch and cancel groupCtx #3206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors how the sync flow retrieves its last offset (using a metadata-only client rather than opening full DB connectors) and ensures the errgroup’s context is cancelled on sync errors so retries can proceed.
- Switch offset fetch to use
external_metadataclient instead of opening source/destination connectors - Wrap the errgroup in a cancellable context and call
cancelFunc()when sync errors to trigger retries
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| flow/activities/flowable_core.go | Replaced connector-based GetLastOffset calls with metadata |
| flow/activities/flowable.go | Introduced context.WithCancel for the errgroup and invoke cancellation on sync failure |
Comments suppressed due to low confidence (2)
flow/activities/flowable.go:387
- The
maintainReplConngoroutine may not return on context cancellation, causingerrgroup.Wait()to hang and preventing retries. EnsuremaintainReplConnwatchesgroupCtx.Done()and returns promptly when the context is cancelled.
cancelFunc()
flow/activities/flowable_core.go:143
- [nitpick] Confirm whether the metadata client returned by
NewPostgresMetadataFromCatalogholds resources that need explicit closing (e.g., a DB connection). If so, add adefer pgMetadata.Close()or equivalent to avoid resource leaks.
return pgMetadata.GetLastOffset(ctx, flowName)
|
|
||
| return dstConn.GetLastOffset(ctx, config.FlowJobName) | ||
| } | ||
| pgMetadata := connmetadata.NewPostgresMetadataFromCatalog(logger, a.CatalogPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't work correctly for pg to pg replication where we store metadata on destination peer instead of catalog
I've brought up wanting to change this, but for now you need to check if source & destination are both pg
In most cases you can return GetLastOffset from srcConn which'll usually do what you're doing here. The problem is that originally metadata was managed by destination connector
|
seems like we should be running the sync loop as another |
…h unless Postgres to Postgres (#3367) Decoupling from #3206 and addressing the [comment](#3206 (comment)) there. This PR makes it so that we do not connect to the destination peer to get the last offset for sync flow. --------- Co-authored-by: Philip Dubé <philip.dube@clickhouse.com>
Consider the following scenario where you have a mirror to ClickHouse with 2 replicas:
In this case, sync flow never retries because normalizeLoop is stuck on normalizing batches and so does not get the chance to see that the syncDone channel is closed (the closing is done when sync flow errors out, in this case due to last offset erroring)
And so we get stuck on the errgroup waiting on normalize to finish -- so we never retry sync flow in this case
This PR:
i) maintainReplConn which does not error out if groupCtx is cancelled
ii) normalizeLoop which anyway will exit because of syncDone
The whole purpose of cancelling groupCtx when syncErr != nil is so that we hit this branch which leads to syncflow activity retrying: