diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index b20bde07baf7..706ec55172d5 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "fmt" "math/rand" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -28,13 +29,13 @@ import ( ) var ( - tableCount = 5000 - nodeCount = 4 + tableCount = 500 + nodeCount = 70 tableNamePrefix = "t" tableSchema = "(id INT PRIMARY KEY, s STRING)" - showJobsTimeout = time.Minute * 2 + showJobsTimeout = time.Minute * 200 pollerMinFrequencySeconds = 30 - roachtestTimeout = time.Minute * 45 + roachtestTimeout = time.Minute * 100 workloadDuration = roachtestTimeout - time.Minute*10 ) @@ -45,8 +46,8 @@ func registerJobs(r registry.Registry) { Name: "jobs/stress", Owner: registry.OwnerDisasterRecovery, Cluster: jobsSpec, + Benchmark: true, EncryptionSupport: registry.EncryptionMetamorphic, - Leases: registry.MetamorphicLeases, CompatibleClouds: registry.OnlyGCE, Suites: registry.Suites(registry.Nightly), Timeout: roachtestTimeout, @@ -75,7 +76,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { // Because this roachtest spins up and pauses/cancels 5k changefeed jobs // really quickly, run the adopt interval which by default only runs every 30s // and adopts 10 jobs at a time. - sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='5s'") + sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='15s'") rng, seed := randutil.NewLockedPseudoRand() t.L().Printf("Rand seed: %d", seed) @@ -97,13 +98,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { return nil }) - randomPoller := func(f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error { + randomPoller := func(waitTime time.Duration, f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error { return func(ctx context.Context, _ *logger.Logger) error { var pTimer timeutil.Timer defer pTimer.Stop() for { - waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second pTimer.Reset(waitTime) select { case <-ctx.Done(): @@ -120,13 +120,23 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { } } } + //for i := 0; i < nodeCount; i++ { + // group.Go(randomPoller(time.Second, checkJobQueryLatency)) + //} - group.Go(randomPoller(checkJobQueryLatency)) + waitTime := time.Duration(rng.Intn(60))*time.Second + time.Minute + jobIDToTableName := make(map[jobspb.JobID]string) + var jobIDToTableNameMu sync.Mutex - group.Go(randomPoller(pauseResumeChangefeeds)) + pauseResumeChangefeedsWithMap := func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error { + return pauseResumeChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu) + } + group.Go(randomPoller(waitTime, pauseResumeChangefeedsWithMap)) + + group.Go(randomPoller(time.Minute*5, splitScatterMergeJobsTable)) group.Go(func(ctx context.Context, _ *logger.Logger) error { - createTablesWithChangefeeds(ctx, t, c, rng) + createTablesWithChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu) return nil }) @@ -135,10 +145,16 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { require.NoError(t, group.WaitE()) checkJobSystemHealth(ctx, t, c, rng) + require.NoError(t, checkJobQueryLatency(ctx, t, c, rng)) } func createTablesWithChangefeeds( - ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, + ctx context.Context, + t test.Test, + c cluster.Cluster, + rng *rand.Rand, + jobIDToTableName map[jobspb.JobID]string, + jobIDToTableNameMu *sync.Mutex, ) { t.L().Printf("Creating %d tables with changefeeds", tableCount) @@ -155,7 +171,14 @@ func createTablesWithChangefeeds( tableName := tableNamePrefix + fmt.Sprintf("%d", i) sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, tableName, tableSchema)) sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName)) - sqlDB.Exec(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='2m', protect_data_from_gc_on_pause", tableName)) + + var jobID jobspb.JobID + sqlDB.QueryRow(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&jobID) + + jobIDToTableNameMu.Lock() + jobIDToTableName[jobID] = tableName + jobIDToTableNameMu.Unlock() + if i%(tableCount/5) == 0 { t.L().Printf("Created %d tables so far", i) } @@ -169,17 +192,17 @@ func checkJobQueryLatency( ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, ) error { conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) - defer conn.Close() - if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout); err != nil { - return err - } - var randomChangefeedJobID int - if err := conn.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] ORDER BY random() LIMIT 1").Scan(&randomChangefeedJobID); err != nil { + toLog := false + if rng.Intn(10) == 0 { + toLog = true + } + defer conn.Close() + if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil { return err } - if err := checkQueryLatency(ctx, redact.Sprintf("SHOW JOB %d", randomChangefeedJobID), conn, t.L(), showJobsTimeout/10); err != nil { + if err := checkQueryLatency(ctx, "SHOW CHANGEFEED JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil { return err } return nil @@ -191,6 +214,7 @@ func checkQueryLatency( conn *gosql.DB, l *logger.Logger, timeout time.Duration, + toLog bool, ) error { queryBegin := timeutil.Now() var err error @@ -207,48 +231,87 @@ func checkQueryLatency( }) return errors.CombineErrors(err, explainErr) } - l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds()) + if toLog { + l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds()) + } return nil } func pauseResumeChangefeeds( - ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, + ctx context.Context, + t test.Test, + c cluster.Cluster, + rng *rand.Rand, + jobIDToTableName map[jobspb.JobID]string, + jobIDToTableNameMu *sync.Mutex, ) error { conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) defer conn.Close() - rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW CHANGEFEED JOBS]") + rows, err := conn.QueryContext(ctx, "SELECT job_id, status FROM [SHOW JOBS] where job_type='CHANGEFEED'") if err != nil { return err } defer rows.Close() - var jobs []jobspb.JobID + // find cancelled changefeeds + var canceledJobs []jobspb.JobID for rows.Next() { var jobID jobspb.JobID - if err := rows.Scan(&jobID); err != nil { + var status string + if err := rows.Scan(&jobID, &status); err != nil { return err } - jobs = append(jobs, jobID) + if status == "canceled" { + canceledJobs = append(canceledJobs, jobID) + } } rows.Close() - if len(jobs) < tableCount/10 { - return nil - } - jobAction := func(cmd string, count int) { - errCount := 0 - for i := 0; i < count; i++ { - jobIdx := rng.Intn(len(jobs)) - _, err := conn.Exec(cmd, jobs[jobIdx]) - if err != nil { - errCount++ - } + recreatedCount := 0 + seenTables := make(map[string]struct{}) + + for i := 0; i < len(canceledJobs); i++ { + jobID := canceledJobs[i] + + // Try to recreate the changefeed + jobIDToTableNameMu.Lock() + tableName, exists := jobIDToTableName[jobID] + jobIDToTableNameMu.Unlock() + if !exists { + t.L().Printf("No table name found for job %d, skipping recreation", jobID) + continue + } + if _, seen := seenTables[tableName]; seen { + continue + } + var newJobID jobspb.JobID + err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='5m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID) + if err == nil { + jobIDToTableNameMu.Lock() + jobIDToTableName[newJobID] = tableName + jobIDToTableNameMu.Unlock() + recreatedCount++ + seenTables[tableName] = struct{}{} + } else { + t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err) } - t.L().Printf("Failed to run %s on %d of %d jobs", cmd, errCount, count) } - jobAction("PAUSE JOB $1", len(jobs)/10) - jobAction("RESUME JOB $1", len(jobs)/10) + t.L().Printf("recreated %d changefeeds, of %d total canceled jobs", recreatedCount, len(canceledJobs)) + return nil +} + +func splitScatterMergeJobsTable( + ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, +) error { + conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) + defer conn.Close() + + _, err := conn.ExecContext(ctx, "ALTER TABLE system.jobs SPLIT AT (select id from system.jobs order by random() limit 3)") + if err != nil { + t.L().Printf("Error splitting %s", err) + } + return nil }