Skip to content

Commit dfc2e11

Browse files
committed
exp
1 parent f87a3c8 commit dfc2e11

File tree

1 file changed

+88
-23
lines changed

1 file changed

+88
-23
lines changed

pkg/cmd/roachtest/tests/jobs.go

Lines changed: 88 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
gosql "database/sql"
1111
"fmt"
1212
"math/rand"
13+
"sync"
1314
"time"
1415

1516
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
@@ -29,12 +30,12 @@ import (
2930

3031
var (
3132
tableCount = 5000
32-
nodeCount = 4
33+
nodeCount = 60
3334
tableNamePrefix = "t"
3435
tableSchema = "(id INT PRIMARY KEY, s STRING)"
35-
showJobsTimeout = time.Minute * 2
36+
showJobsTimeout = time.Minute * 200
3637
pollerMinFrequencySeconds = 30
37-
roachtestTimeout = time.Minute * 45
38+
roachtestTimeout = time.Minute * 100
3839
workloadDuration = roachtestTimeout - time.Minute*10
3940
)
4041

@@ -97,13 +98,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
9798
return nil
9899
})
99100

100-
randomPoller := func(f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error {
101+
randomPoller := func(waitTime time.Duration, f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error {
101102

102103
return func(ctx context.Context, _ *logger.Logger) error {
103104
var pTimer timeutil.Timer
104105
defer pTimer.Stop()
105106
for {
106-
waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second
107107
pTimer.Reset(waitTime)
108108
select {
109109
case <-ctx.Done():
@@ -120,13 +120,24 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
120120
}
121121
}
122122
}
123+
for i := 0; i < nodeCount; i++ {
124+
group.Go(randomPoller(time.Second, checkJobQueryLatency))
125+
}
123126

124-
group.Go(randomPoller(checkJobQueryLatency))
127+
waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second
128+
jobIDToTableName := make(map[jobspb.JobID]string)
129+
var jobIDToTableNameMu sync.Mutex
130+
131+
pauseResumeChangefeedsWithMap := func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error {
132+
return pauseResumeChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu)
133+
}
134+
group.Go(randomPoller(waitTime, pauseResumeChangefeedsWithMap))
125135

126-
group.Go(randomPoller(pauseResumeChangefeeds))
136+
splitWaitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second
137+
group.Go(randomPoller(splitWaitTime, splitScatterMergeJobsTable))
127138

128139
group.Go(func(ctx context.Context, _ *logger.Logger) error {
129-
createTablesWithChangefeeds(ctx, t, c, rng)
140+
createTablesWithChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu)
130141
return nil
131142
})
132143

@@ -138,7 +149,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
138149
}
139150

140151
func createTablesWithChangefeeds(
141-
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
152+
ctx context.Context,
153+
t test.Test,
154+
c cluster.Cluster,
155+
rng *rand.Rand,
156+
jobIDToTableName map[jobspb.JobID]string,
157+
jobIDToTableNameMu *sync.Mutex,
142158
) {
143159
t.L().Printf("Creating %d tables with changefeeds", tableCount)
144160

@@ -155,7 +171,14 @@ func createTablesWithChangefeeds(
155171
tableName := tableNamePrefix + fmt.Sprintf("%d", i)
156172
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, tableName, tableSchema))
157173
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName))
158-
sqlDB.Exec(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='2m', protect_data_from_gc_on_pause", tableName))
174+
175+
var jobID jobspb.JobID
176+
sqlDB.QueryRow(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&jobID)
177+
178+
jobIDToTableNameMu.Lock()
179+
jobIDToTableName[jobID] = tableName
180+
jobIDToTableNameMu.Unlock()
181+
159182
if i%(tableCount/5) == 0 {
160183
t.L().Printf("Created %d tables so far", i)
161184
}
@@ -169,17 +192,17 @@ func checkJobQueryLatency(
169192
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
170193
) error {
171194
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
172-
defer conn.Close()
173-
if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout); err != nil {
174-
return err
175-
}
176195

177-
var randomChangefeedJobID int
178-
if err := conn.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] ORDER BY random() LIMIT 1").Scan(&randomChangefeedJobID); err != nil {
196+
toLog := false
197+
if rng.Intn(10) == 0 {
198+
toLog = true
199+
}
200+
defer conn.Close()
201+
if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil {
179202
return err
180203
}
181204

182-
if err := checkQueryLatency(ctx, redact.Sprintf("SHOW JOB %d", randomChangefeedJobID), conn, t.L(), showJobsTimeout/10); err != nil {
205+
if err := checkQueryLatency(ctx, "SHOW CHANGEFEED JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil {
183206
return err
184207
}
185208
return nil
@@ -191,6 +214,7 @@ func checkQueryLatency(
191214
conn *gosql.DB,
192215
l *logger.Logger,
193216
timeout time.Duration,
217+
toLog bool,
194218
) error {
195219
queryBegin := timeutil.Now()
196220
var err error
@@ -207,17 +231,24 @@ func checkQueryLatency(
207231
})
208232
return errors.CombineErrors(err, explainErr)
209233
}
210-
l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds())
234+
if toLog {
235+
l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds())
236+
}
211237
return nil
212238
}
213239

214240
func pauseResumeChangefeeds(
215-
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
241+
ctx context.Context,
242+
t test.Test,
243+
c cluster.Cluster,
244+
rng *rand.Rand,
245+
jobIDToTableName map[jobspb.JobID]string,
246+
jobIDToTableNameMu *sync.Mutex,
216247
) error {
217248
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
218249
defer conn.Close()
219250

220-
rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW CHANGEFEED JOBS]")
251+
rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW JOBS] where job_type='CHANGEFEED'")
221252
if err != nil {
222253
return err
223254
}
@@ -233,25 +264,59 @@ func pauseResumeChangefeeds(
233264
}
234265
rows.Close()
235266

236-
if len(jobs) < tableCount/10 {
267+
if len(jobs) < tableCount/2 {
237268
return nil
238269
}
239270
jobAction := func(cmd string, count int) {
240271
errCount := 0
272+
recreatedCount := 0
241273
for i := 0; i < count; i++ {
242274
jobIdx := rng.Intn(len(jobs))
243-
_, err := conn.Exec(cmd, jobs[jobIdx])
275+
jobID := jobs[jobIdx]
276+
_, err := conn.Exec(cmd, jobID)
244277
if err != nil {
245278
errCount++
279+
// Try to recreate the changefeed
280+
jobIDToTableNameMu.Lock()
281+
tableName, exists := jobIDToTableName[jobID]
282+
jobIDToTableNameMu.Unlock()
283+
if !exists {
284+
t.L().Printf("No table name found for job %d, skipping recreation", jobID)
285+
continue
286+
}
287+
var newJobID jobspb.JobID
288+
err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='10m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID)
289+
if err == nil {
290+
jobIDToTableNameMu.Lock()
291+
jobIDToTableName[newJobID] = tableName
292+
jobIDToTableNameMu.Unlock()
293+
recreatedCount++
294+
} else {
295+
t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err)
296+
}
246297
}
247298
}
248-
t.L().Printf("Failed to run %s on %d of %d jobs", cmd, errCount, count)
299+
t.L().Printf("Failed to run %s on %d of %d jobs, recreated %d changefeeds, of %d total cf jobs", cmd, errCount, count, recreatedCount, len(jobs))
249300
}
250301
jobAction("PAUSE JOB $1", len(jobs)/10)
251302
jobAction("RESUME JOB $1", len(jobs)/10)
252303
return nil
253304
}
254305

306+
func splitScatterMergeJobsTable(
307+
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
308+
) error {
309+
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
310+
defer conn.Close()
311+
312+
_, err := conn.ExecContext(ctx, "ALTER TABLE system.jobs SPLIT AT (select id from system.jobs order by random() limit 3)")
313+
if err != nil {
314+
t.L().Printf("Error splitting %s", err)
315+
}
316+
317+
return nil
318+
}
319+
255320
func checkJobSystemHealth(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) {
256321
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
257322
defer conn.Close()

0 commit comments

Comments
 (0)