@@ -239,37 +239,6 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
239
239
. await
240
240
. context ( "database migrations" ) ?;
241
241
242
- // spawning a background task that will schedule the jobs
243
- // every JOB_SCHEDULING_CADENCE_IN_SECS
244
- task:: spawn ( async move {
245
- if env:: var_os ( "TRIAGEBOT_TEST_DISABLE_JOBS" ) . is_some ( ) {
246
- return ;
247
- }
248
- loop {
249
- let res = task:: spawn ( async move {
250
- let pool = db:: ClientPool :: new ( ) ;
251
- let mut interval =
252
- time:: interval ( time:: Duration :: from_secs ( JOB_SCHEDULING_CADENCE_IN_SECS ) ) ;
253
-
254
- loop {
255
- interval. tick ( ) . await ;
256
- db:: schedule_jobs ( & * pool. get ( ) . await , jobs ( ) )
257
- . await
258
- . context ( "database schedule jobs" )
259
- . unwrap ( ) ;
260
- }
261
- } ) ;
262
-
263
- match res. await {
264
- Err ( err) if err. is_panic ( ) => {
265
- /* handle panic in above task, re-launching */
266
- tracing:: trace!( "schedule_jobs task died (error={})" , err) ;
267
- }
268
- _ => unreachable ! ( ) ,
269
- }
270
- }
271
- } ) ;
272
-
273
242
let gh = github:: GithubClient :: new_from_env ( ) ;
274
243
let oc = octocrab:: OctocrabBuilder :: new ( )
275
244
. personal_token ( github:: default_token_from_env ( ) )
@@ -282,35 +251,10 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
282
251
octocrab : oc,
283
252
} ) ;
284
253
285
- // spawning a background task that will run the scheduled jobs
286
- // every JOB_PROCESSING_CADENCE_IN_SECS
287
- let ctx2 = ctx. clone ( ) ;
288
- task:: spawn ( async move {
289
- loop {
290
- let ctx = ctx2. clone ( ) ;
291
- let res = task:: spawn ( async move {
292
- let pool = db:: ClientPool :: new ( ) ;
293
- let mut interval =
294
- time:: interval ( time:: Duration :: from_secs ( JOB_PROCESSING_CADENCE_IN_SECS ) ) ;
295
-
296
- loop {
297
- interval. tick ( ) . await ;
298
- db:: run_scheduled_jobs ( & ctx, & * pool. get ( ) . await )
299
- . await
300
- . context ( "run database scheduled jobs" )
301
- . unwrap ( ) ;
302
- }
303
- } ) ;
304
-
305
- match res. await {
306
- Err ( err) if err. is_panic ( ) => {
307
- /* handle panic in above task, re-launching */
308
- tracing:: trace!( "run_scheduled_jobs task died (error={})" , err) ;
309
- }
310
- _ => unreachable ! ( ) ,
311
- }
312
- }
313
- } ) ;
254
+ if !is_scheduled_jobs_disabled ( ) {
255
+ spawn_job_scheduler ( ) ;
256
+ spawn_job_runner ( ctx. clone ( ) ) ;
257
+ }
314
258
315
259
let agenda = tower:: ServiceBuilder :: new ( )
316
260
. buffer ( 10 )
@@ -353,6 +297,85 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
353
297
Ok ( ( ) )
354
298
}
355
299
300
+ /// Spawns a background tokio task which runs continuously to queue up jobs
301
+ /// to be run by the job runner.
302
+ ///
303
+ /// The scheduler wakes up every `JOB_SCHEDULING_CADENCE_IN_SECS` seconds to
304
+ /// check if there are any jobs ready to run. Jobs get inserted into the the
305
+ /// database which acts as a queue.
306
+ fn spawn_job_scheduler ( ) {
307
+ task:: spawn ( async move {
308
+ loop {
309
+ let res = task:: spawn ( async move {
310
+ let pool = db:: ClientPool :: new ( ) ;
311
+ let mut interval =
312
+ time:: interval ( time:: Duration :: from_secs ( JOB_SCHEDULING_CADENCE_IN_SECS ) ) ;
313
+
314
+ loop {
315
+ interval. tick ( ) . await ;
316
+ db:: schedule_jobs ( & * pool. get ( ) . await , jobs ( ) )
317
+ . await
318
+ . context ( "database schedule jobs" )
319
+ . unwrap ( ) ;
320
+ }
321
+ } ) ;
322
+
323
+ match res. await {
324
+ Err ( err) if err. is_panic ( ) => {
325
+ /* handle panic in above task, re-launching */
326
+ tracing:: trace!( "schedule_jobs task died (error={})" , err) ;
327
+ }
328
+ _ => unreachable ! ( ) ,
329
+ }
330
+ }
331
+ } ) ;
332
+ }
333
+
334
+ /// Spawns a background tokio task which runs continuously to run scheduled
335
+ /// jobs.
336
+ ///
337
+ /// The runner wakes up every `JOB_PROCESSING_CADENCE_IN_SECS` seconds to
338
+ /// check if any jobs have been put into the queue by the scheduler. They
339
+ /// will get popped off the queue and run if any are found.
340
+ fn spawn_job_runner ( ctx : Arc < Context > ) {
341
+ task:: spawn ( async move {
342
+ loop {
343
+ let ctx = ctx. clone ( ) ;
344
+ let res = task:: spawn ( async move {
345
+ let pool = db:: ClientPool :: new ( ) ;
346
+ let mut interval =
347
+ time:: interval ( time:: Duration :: from_secs ( JOB_PROCESSING_CADENCE_IN_SECS ) ) ;
348
+
349
+ loop {
350
+ interval. tick ( ) . await ;
351
+ db:: run_scheduled_jobs ( & ctx, & * pool. get ( ) . await )
352
+ . await
353
+ . context ( "run database scheduled jobs" )
354
+ . unwrap ( ) ;
355
+ }
356
+ } ) ;
357
+
358
+ match res. await {
359
+ Err ( err) if err. is_panic ( ) => {
360
+ /* handle panic in above task, re-launching */
361
+ tracing:: trace!( "run_scheduled_jobs task died (error={})" , err) ;
362
+ }
363
+ _ => unreachable ! ( ) ,
364
+ }
365
+ }
366
+ } ) ;
367
+ }
368
+
369
+ /// Determines whether or not background scheduled jobs should be disabled for
370
+ /// the purpose of testing.
371
+ ///
372
+ /// This helps avoid having random jobs run while testing other things.
373
+ fn is_scheduled_jobs_disabled ( ) -> bool {
374
+ // TRIAGEBOT_TEST_DISABLE_JOBS is set automatically by the test runner,
375
+ // and shouldn't be needed to be set manually.
376
+ env:: var_os ( "TRIAGEBOT_TEST_DISABLE_JOBS" ) . is_some ( ) || triagebot:: test_record:: is_recording ( )
377
+ }
378
+
356
379
#[ tokio:: main( flavor = "current_thread" ) ]
357
380
async fn main ( ) {
358
381
dotenv:: dotenv ( ) . ok ( ) ;
0 commit comments