Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 103 additions & 40 deletions pkg/cmd/roachtest/tests/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand All @@ -28,13 +29,13 @@ import (
)

var (
tableCount = 5000
nodeCount = 4
tableCount = 500
nodeCount = 70
tableNamePrefix = "t"
tableSchema = "(id INT PRIMARY KEY, s STRING)"
showJobsTimeout = time.Minute * 2
showJobsTimeout = time.Minute * 200
pollerMinFrequencySeconds = 30
roachtestTimeout = time.Minute * 45
roachtestTimeout = time.Minute * 100
workloadDuration = roachtestTimeout - time.Minute*10
)

Expand All @@ -45,8 +46,8 @@ func registerJobs(r registry.Registry) {
Name: "jobs/stress",
Owner: registry.OwnerDisasterRecovery,
Cluster: jobsSpec,
Benchmark: true,
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.OnlyGCE,
Suites: registry.Suites(registry.Nightly),
Timeout: roachtestTimeout,
Expand Down Expand Up @@ -75,7 +76,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
// Because this roachtest spins up and pauses/cancels 5k changefeed jobs
// really quickly, run the adopt interval which by default only runs every 30s
// and adopts 10 jobs at a time.
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='5s'")
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='15s'")

rng, seed := randutil.NewLockedPseudoRand()
t.L().Printf("Rand seed: %d", seed)
Expand All @@ -97,13 +98,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
return nil
})

randomPoller := func(f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error {
randomPoller := func(waitTime time.Duration, f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error {

return func(ctx context.Context, _ *logger.Logger) error {
var pTimer timeutil.Timer
defer pTimer.Stop()
for {
waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second
pTimer.Reset(waitTime)
select {
case <-ctx.Done():
Expand All @@ -120,13 +120,23 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
}
}
}
//for i := 0; i < nodeCount; i++ {
// group.Go(randomPoller(time.Second, checkJobQueryLatency))
//}

group.Go(randomPoller(checkJobQueryLatency))
waitTime := time.Duration(rng.Intn(60))*time.Second + time.Minute
jobIDToTableName := make(map[jobspb.JobID]string)
var jobIDToTableNameMu sync.Mutex

group.Go(randomPoller(pauseResumeChangefeeds))
pauseResumeChangefeedsWithMap := func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error {
return pauseResumeChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu)
}
group.Go(randomPoller(waitTime, pauseResumeChangefeedsWithMap))

group.Go(randomPoller(time.Minute*5, splitScatterMergeJobsTable))

group.Go(func(ctx context.Context, _ *logger.Logger) error {
createTablesWithChangefeeds(ctx, t, c, rng)
createTablesWithChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu)
return nil
})

Expand All @@ -135,10 +145,16 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {

require.NoError(t, group.WaitE())
checkJobSystemHealth(ctx, t, c, rng)
require.NoError(t, checkJobQueryLatency(ctx, t, c, rng))
}

func createTablesWithChangefeeds(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
ctx context.Context,
t test.Test,
c cluster.Cluster,
rng *rand.Rand,
jobIDToTableName map[jobspb.JobID]string,
jobIDToTableNameMu *sync.Mutex,
) {
t.L().Printf("Creating %d tables with changefeeds", tableCount)

Expand All @@ -155,7 +171,14 @@ func createTablesWithChangefeeds(
tableName := tableNamePrefix + fmt.Sprintf("%d", i)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, tableName, tableSchema))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName))
sqlDB.Exec(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='2m', protect_data_from_gc_on_pause", tableName))

var jobID jobspb.JobID
sqlDB.QueryRow(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&jobID)

jobIDToTableNameMu.Lock()
jobIDToTableName[jobID] = tableName
jobIDToTableNameMu.Unlock()

if i%(tableCount/5) == 0 {
t.L().Printf("Created %d tables so far", i)
}
Expand All @@ -169,17 +192,17 @@ func checkJobQueryLatency(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
) error {
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
defer conn.Close()
if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout); err != nil {
return err
}

var randomChangefeedJobID int
if err := conn.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] ORDER BY random() LIMIT 1").Scan(&randomChangefeedJobID); err != nil {
toLog := false
if rng.Intn(10) == 0 {
toLog = true
}
defer conn.Close()
if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil {
return err
}

if err := checkQueryLatency(ctx, redact.Sprintf("SHOW JOB %d", randomChangefeedJobID), conn, t.L(), showJobsTimeout/10); err != nil {
if err := checkQueryLatency(ctx, "SHOW CHANGEFEED JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil {
return err
}
return nil
Expand All @@ -191,6 +214,7 @@ func checkQueryLatency(
conn *gosql.DB,
l *logger.Logger,
timeout time.Duration,
toLog bool,
) error {
queryBegin := timeutil.Now()
var err error
Expand All @@ -207,48 +231,87 @@ func checkQueryLatency(
})
return errors.CombineErrors(err, explainErr)
}
l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds())
if toLog {
l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds())
}
return nil
}

func pauseResumeChangefeeds(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
ctx context.Context,
t test.Test,
c cluster.Cluster,
rng *rand.Rand,
jobIDToTableName map[jobspb.JobID]string,
jobIDToTableNameMu *sync.Mutex,
) error {
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
defer conn.Close()

rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW CHANGEFEED JOBS]")
rows, err := conn.QueryContext(ctx, "SELECT job_id, status FROM [SHOW JOBS] where job_type='CHANGEFEED'")
if err != nil {
return err
}
defer rows.Close()

var jobs []jobspb.JobID
// find cancelled changefeeds
var canceledJobs []jobspb.JobID
for rows.Next() {
var jobID jobspb.JobID
if err := rows.Scan(&jobID); err != nil {
var status string
if err := rows.Scan(&jobID, &status); err != nil {
return err
}
jobs = append(jobs, jobID)
if status == "canceled" {
canceledJobs = append(canceledJobs, jobID)
}
}
rows.Close()

if len(jobs) < tableCount/10 {
return nil
}
jobAction := func(cmd string, count int) {
errCount := 0
for i := 0; i < count; i++ {
jobIdx := rng.Intn(len(jobs))
_, err := conn.Exec(cmd, jobs[jobIdx])
if err != nil {
errCount++
}
recreatedCount := 0
seenTables := make(map[string]struct{})

for i := 0; i < len(canceledJobs); i++ {
jobID := canceledJobs[i]

// Try to recreate the changefeed
jobIDToTableNameMu.Lock()
tableName, exists := jobIDToTableName[jobID]
jobIDToTableNameMu.Unlock()
if !exists {
t.L().Printf("No table name found for job %d, skipping recreation", jobID)
continue
}
if _, seen := seenTables[tableName]; seen {
continue
}
var newJobID jobspb.JobID
err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='5m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID)
if err == nil {
jobIDToTableNameMu.Lock()
jobIDToTableName[newJobID] = tableName
jobIDToTableNameMu.Unlock()
recreatedCount++
seenTables[tableName] = struct{}{}
} else {
t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err)
}
t.L().Printf("Failed to run %s on %d of %d jobs", cmd, errCount, count)
}
jobAction("PAUSE JOB $1", len(jobs)/10)
jobAction("RESUME JOB $1", len(jobs)/10)
t.L().Printf("recreated %d changefeeds, of %d total canceled jobs", recreatedCount, len(canceledJobs))
return nil
}

func splitScatterMergeJobsTable(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
) error {
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
defer conn.Close()

_, err := conn.ExecContext(ctx, "ALTER TABLE system.jobs SPLIT AT (select id from system.jobs order by random() limit 3)")
if err != nil {
t.L().Printf("Error splitting %s", err)
}

return nil
}

Expand Down
Loading