From 5bbc9692f3b39210b2aca3cb1675484ca02e6c2d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 19 Nov 2025 13:49:59 -0500 Subject: [PATCH 1/4] exp --- pkg/cmd/roachtest/tests/jobs.go | 110 +++++++++++++++++++++++++------- 1 file changed, 87 insertions(+), 23 deletions(-) diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index b20bde07baf7..bc18bb0eb8dc 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "fmt" "math/rand" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -29,12 +30,12 @@ import ( var ( tableCount = 5000 - nodeCount = 4 + nodeCount = 70 tableNamePrefix = "t" tableSchema = "(id INT PRIMARY KEY, s STRING)" - showJobsTimeout = time.Minute * 2 + showJobsTimeout = time.Minute * 200 pollerMinFrequencySeconds = 30 - roachtestTimeout = time.Minute * 45 + roachtestTimeout = time.Minute * 100 workloadDuration = roachtestTimeout - time.Minute*10 ) @@ -97,13 +98,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { return nil }) - randomPoller := func(f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error { + randomPoller := func(waitTime time.Duration, f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error { return func(ctx context.Context, _ *logger.Logger) error { var pTimer timeutil.Timer defer pTimer.Stop() for { - waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second pTimer.Reset(waitTime) select { case <-ctx.Done(): @@ -120,13 +120,23 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { } } } + for i := 0; i < nodeCount; i++ { + group.Go(randomPoller(time.Second, checkJobQueryLatency)) + } - group.Go(randomPoller(checkJobQueryLatency)) + waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second + jobIDToTableName := make(map[jobspb.JobID]string) + var jobIDToTableNameMu sync.Mutex + + pauseResumeChangefeedsWithMap := func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error { + return pauseResumeChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu) + } + group.Go(randomPoller(waitTime, pauseResumeChangefeedsWithMap)) - group.Go(randomPoller(pauseResumeChangefeeds)) + group.Go(randomPoller(time.Minute*5, splitScatterMergeJobsTable)) group.Go(func(ctx context.Context, _ *logger.Logger) error { - createTablesWithChangefeeds(ctx, t, c, rng) + createTablesWithChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu) return nil }) @@ -138,7 +148,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { } func createTablesWithChangefeeds( - ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, + ctx context.Context, + t test.Test, + c cluster.Cluster, + rng *rand.Rand, + jobIDToTableName map[jobspb.JobID]string, + jobIDToTableNameMu *sync.Mutex, ) { t.L().Printf("Creating %d tables with changefeeds", tableCount) @@ -155,7 +170,14 @@ func createTablesWithChangefeeds( tableName := tableNamePrefix + fmt.Sprintf("%d", i) sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, tableName, tableSchema)) sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName)) - sqlDB.Exec(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='2m', protect_data_from_gc_on_pause", tableName)) + + var jobID jobspb.JobID + sqlDB.QueryRow(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&jobID) + + jobIDToTableNameMu.Lock() + jobIDToTableName[jobID] = tableName + jobIDToTableNameMu.Unlock() + if i%(tableCount/5) == 0 { t.L().Printf("Created %d tables so far", i) } @@ -169,17 +191,17 @@ func checkJobQueryLatency( ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, ) error { conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) - defer conn.Close() - if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout); err != nil { - return err - } - var randomChangefeedJobID int - if err := conn.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] ORDER BY random() LIMIT 1").Scan(&randomChangefeedJobID); err != nil { + toLog := false + if rng.Intn(10) == 0 { + toLog = true + } + defer conn.Close() + if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil { return err } - if err := checkQueryLatency(ctx, redact.Sprintf("SHOW JOB %d", randomChangefeedJobID), conn, t.L(), showJobsTimeout/10); err != nil { + if err := checkQueryLatency(ctx, "SHOW CHANGEFEED JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil { return err } return nil @@ -191,6 +213,7 @@ func checkQueryLatency( conn *gosql.DB, l *logger.Logger, timeout time.Duration, + toLog bool, ) error { queryBegin := timeutil.Now() var err error @@ -207,17 +230,24 @@ func checkQueryLatency( }) return errors.CombineErrors(err, explainErr) } - l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds()) + if toLog { + l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds()) + } return nil } func pauseResumeChangefeeds( - ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, + ctx context.Context, + t test.Test, + c cluster.Cluster, + rng *rand.Rand, + jobIDToTableName map[jobspb.JobID]string, + jobIDToTableNameMu *sync.Mutex, ) error { conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) defer conn.Close() - rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW CHANGEFEED JOBS]") + rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW JOBS] where job_type='CHANGEFEED'") if err != nil { return err } @@ -233,25 +263,59 @@ func pauseResumeChangefeeds( } rows.Close() - if len(jobs) < tableCount/10 { + if len(jobs) < tableCount/2 { return nil } jobAction := func(cmd string, count int) { errCount := 0 + recreatedCount := 0 for i := 0; i < count; i++ { jobIdx := rng.Intn(len(jobs)) - _, err := conn.Exec(cmd, jobs[jobIdx]) + jobID := jobs[jobIdx] + _, err := conn.Exec(cmd, jobID) if err != nil { errCount++ + // Try to recreate the changefeed + jobIDToTableNameMu.Lock() + tableName, exists := jobIDToTableName[jobID] + jobIDToTableNameMu.Unlock() + if !exists { + t.L().Printf("No table name found for job %d, skipping recreation", jobID) + continue + } + var newJobID jobspb.JobID + err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID) + if err == nil { + jobIDToTableNameMu.Lock() + jobIDToTableName[newJobID] = tableName + jobIDToTableNameMu.Unlock() + recreatedCount++ + } else { + t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err) + } } } - t.L().Printf("Failed to run %s on %d of %d jobs", cmd, errCount, count) + t.L().Printf("Failed to run %s on %d of %d jobs, recreated %d changefeeds, of %d total cf jobs", cmd, errCount, count, recreatedCount, len(jobs)) } jobAction("PAUSE JOB $1", len(jobs)/10) jobAction("RESUME JOB $1", len(jobs)/10) return nil } +func splitScatterMergeJobsTable( + ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, +) error { + conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) + defer conn.Close() + + _, err := conn.ExecContext(ctx, "ALTER TABLE system.jobs SPLIT AT (select id from system.jobs order by random() limit 3)") + if err != nil { + t.L().Printf("Error splitting %s", err) + } + + return nil +} + func checkJobSystemHealth(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) { conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) defer conn.Close() From 227371bb6b6cdcd4cad26f20fbf1483d93283a41 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Sat, 22 Nov 2025 14:33:25 -0500 Subject: [PATCH 2/4] remove show changefeeds poller --- pkg/cmd/roachtest/tests/jobs.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index bc18bb0eb8dc..c235f4808ad7 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -46,8 +46,8 @@ func registerJobs(r registry.Registry) { Name: "jobs/stress", Owner: registry.OwnerDisasterRecovery, Cluster: jobsSpec, + Benchmark: true, EncryptionSupport: registry.EncryptionMetamorphic, - Leases: registry.MetamorphicLeases, CompatibleClouds: registry.OnlyGCE, Suites: registry.Suites(registry.Nightly), Timeout: roachtestTimeout, @@ -120,9 +120,9 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { } } } - for i := 0; i < nodeCount; i++ { - group.Go(randomPoller(time.Second, checkJobQueryLatency)) - } + //for i := 0; i < nodeCount; i++ { + // group.Go(randomPoller(time.Second, checkJobQueryLatency)) + //} waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second jobIDToTableName := make(map[jobspb.JobID]string) @@ -145,6 +145,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { require.NoError(t, group.WaitE()) checkJobSystemHealth(ctx, t, c, rng) + require.NoError(t, checkJobQueryLatency(ctx, t, c, rng)) } func createTablesWithChangefeeds( From 94652e28addff2af258bd7884c785760d54b9e67 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 24 Nov 2025 11:52:33 -0500 Subject: [PATCH 3/4] remove auto pause/cancel --- pkg/cmd/roachtest/tests/jobs.go | 78 ++++++++++++++++----------------- 1 file changed, 37 insertions(+), 41 deletions(-) diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index c235f4808ad7..37fd1e80a3e9 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -29,7 +29,7 @@ import ( ) var ( - tableCount = 5000 + tableCount = 500 nodeCount = 70 tableNamePrefix = "t" tableSchema = "(id INT PRIMARY KEY, s STRING)" @@ -76,7 +76,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { // Because this roachtest spins up and pauses/cancels 5k changefeed jobs // really quickly, run the adopt interval which by default only runs every 30s // and adopts 10 jobs at a time. - sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='5s'") + sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='15s'") rng, seed := randutil.NewLockedPseudoRand() t.L().Printf("Rand seed: %d", seed) @@ -124,7 +124,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { // group.Go(randomPoller(time.Second, checkJobQueryLatency)) //} - waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second + waitTime := time.Duration(rng.Intn(180)+1) * time.Second jobIDToTableName := make(map[jobspb.JobID]string) var jobIDToTableNameMu sync.Mutex @@ -248,58 +248,54 @@ func pauseResumeChangefeeds( conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) defer conn.Close() - rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW JOBS] where job_type='CHANGEFEED'") + rows, err := conn.QueryContext(ctx, "SELECT job_id, status FROM [SHOW JOBS] where job_type='CHANGEFEED'") if err != nil { return err } defer rows.Close() - - var jobs []jobspb.JobID + + // find cancelled changefeeds + var canceledJobs []jobspb.JobID for rows.Next() { var jobID jobspb.JobID - if err := rows.Scan(&jobID); err != nil { + var status string + if err := rows.Scan(&jobID,&status); err != nil { return err } - jobs = append(jobs, jobID) + if status == "canceled" { + canceledJobs = append(canceledJobs, jobID) + } } rows.Close() - - if len(jobs) < tableCount/2 { - return nil - } - jobAction := func(cmd string, count int) { - errCount := 0 - recreatedCount := 0 - for i := 0; i < count; i++ { - jobIdx := rng.Intn(len(jobs)) - jobID := jobs[jobIdx] - _, err := conn.Exec(cmd, jobID) - if err != nil { - errCount++ - // Try to recreate the changefeed + + // resume 10% of canceled changefeeds + count := len(canceledJobs) /10 + + recreatedCount := 0 + for i := 0; i < count; i++ { + jobIdx := rng.Intn(len(canceledJobs)) + jobID := canceledJobs[jobIdx] + + // Try to recreate the changefeed + jobIDToTableNameMu.Lock() + tableName, exists := jobIDToTableName[jobID] + jobIDToTableNameMu.Unlock() + if !exists { + t.L().Printf("No table name found for job %d, skipping recreation", jobID) + continue + } + var newJobID jobspb.JobID + err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID) + if err == nil { jobIDToTableNameMu.Lock() - tableName, exists := jobIDToTableName[jobID] + jobIDToTableName[newJobID] = tableName jobIDToTableNameMu.Unlock() - if !exists { - t.L().Printf("No table name found for job %d, skipping recreation", jobID) - continue - } - var newJobID jobspb.JobID - err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID) - if err == nil { - jobIDToTableNameMu.Lock() - jobIDToTableName[newJobID] = tableName - jobIDToTableNameMu.Unlock() - recreatedCount++ - } else { - t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err) - } + recreatedCount++ + } else { + t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err) } - } - t.L().Printf("Failed to run %s on %d of %d jobs, recreated %d changefeeds, of %d total cf jobs", cmd, errCount, count, recreatedCount, len(jobs)) } - jobAction("PAUSE JOB $1", len(jobs)/10) - jobAction("RESUME JOB $1", len(jobs)/10) + t.L().Printf("recreated %d changefeeds, of %d total canceled jobs", recreatedCount, len(canceledJobs)) return nil } From da57d1082c1531f06058d6b91f7d41a3c04a2d5a Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 24 Nov 2025 21:21:20 -0500 Subject: [PATCH 4/4] lower cancel ttl, recreate all canceled jobs --- pkg/cmd/roachtest/tests/jobs.go | 52 +++++++++++++++++---------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index 37fd1e80a3e9..706ec55172d5 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -124,7 +124,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { // group.Go(randomPoller(time.Second, checkJobQueryLatency)) //} - waitTime := time.Duration(rng.Intn(180)+1) * time.Second + waitTime := time.Duration(rng.Intn(60))*time.Second + time.Minute jobIDToTableName := make(map[jobspb.JobID]string) var jobIDToTableNameMu sync.Mutex @@ -253,13 +253,13 @@ func pauseResumeChangefeeds( return err } defer rows.Close() - + // find cancelled changefeeds var canceledJobs []jobspb.JobID for rows.Next() { var jobID jobspb.JobID var status string - if err := rows.Scan(&jobID,&status); err != nil { + if err := rows.Scan(&jobID, &status); err != nil { return err } if status == "canceled" { @@ -267,33 +267,35 @@ func pauseResumeChangefeeds( } } rows.Close() - - // resume 10% of canceled changefeeds - count := len(canceledJobs) /10 recreatedCount := 0 - for i := 0; i < count; i++ { - jobIdx := rng.Intn(len(canceledJobs)) - jobID := canceledJobs[jobIdx] + seenTables := make(map[string]struct{}) + + for i := 0; i < len(canceledJobs); i++ { + jobID := canceledJobs[i] - // Try to recreate the changefeed + // Try to recreate the changefeed + jobIDToTableNameMu.Lock() + tableName, exists := jobIDToTableName[jobID] + jobIDToTableNameMu.Unlock() + if !exists { + t.L().Printf("No table name found for job %d, skipping recreation", jobID) + continue + } + if _, seen := seenTables[tableName]; seen { + continue + } + var newJobID jobspb.JobID + err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='5m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID) + if err == nil { jobIDToTableNameMu.Lock() - tableName, exists := jobIDToTableName[jobID] + jobIDToTableName[newJobID] = tableName jobIDToTableNameMu.Unlock() - if !exists { - t.L().Printf("No table name found for job %d, skipping recreation", jobID) - continue - } - var newJobID jobspb.JobID - err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID) - if err == nil { - jobIDToTableNameMu.Lock() - jobIDToTableName[newJobID] = tableName - jobIDToTableNameMu.Unlock() - recreatedCount++ - } else { - t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err) - } + recreatedCount++ + seenTables[tableName] = struct{}{} + } else { + t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err) + } } t.L().Printf("recreated %d changefeeds, of %d total canceled jobs", recreatedCount, len(canceledJobs)) return nil