@@ -31,11 +31,9 @@ use databend_common_meta_app::principal::task::TaskState;
3131use databend_common_meta_app:: principal:: task_dependent_ident:: TaskDependentIdent ;
3232use databend_common_meta_app:: principal:: task_message_ident:: TaskMessageIdent ;
3333use databend_common_meta_app:: principal:: task_state_ident:: TaskStateIdent ;
34- use databend_common_meta_app:: principal:: DependentType ;
3534use databend_common_meta_app:: principal:: ScheduleType ;
3635use databend_common_meta_app:: principal:: Status ;
3736use databend_common_meta_app:: principal:: Task ;
38- use databend_common_meta_app:: principal:: TaskDependentKey ;
3937use databend_common_meta_app:: principal:: TaskIdent ;
4038use databend_common_meta_app:: schema:: CreateOption ;
4139use databend_common_meta_app:: tenant:: Tenant ;
@@ -271,9 +269,7 @@ impl TaskMgr {
271269 let mut update_ops = Vec :: new ( ) ;
272270 let mut check_ops = Vec :: with_capacity ( new_afters. len ( ) ) ;
273271
274- let after_dependent = TaskDependentKey :: new ( DependentType :: After , task_name. to_string ( ) ) ;
275- let after_dependent_ident =
276- TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
272+ let after_dependent_ident = TaskDependentIdent :: new_after ( & self . tenant , task_name) ;
277273
278274 let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
279275 check_ops. push ( txn_cond_eq_seq (
@@ -285,10 +281,9 @@ impl TaskMgr {
285281 after_deps. 0 . extend ( new_afters. iter ( ) . cloned ( ) ) ;
286282 update_ops. push ( txn_put_pb ( & after_dependent_ident, & after_deps) ?) ;
287283
288- let before_dependent_idents = new_afters. iter ( ) . map ( |after| {
289- let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
290- TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
291- } ) ;
284+ let before_dependent_idents = new_afters
285+ . iter ( )
286+ . map ( |after| TaskDependentIdent :: new_before ( & self . tenant , after) ) ;
292287 for ( before_dependent_ident, before_seq_deps) in
293288 self . kv_api . get_pb_vec ( before_dependent_idents) . await ?
294289 {
@@ -324,10 +319,7 @@ impl TaskMgr {
324319 let mut update_ops = Vec :: new ( ) ;
325320 let mut check_ops = Vec :: with_capacity ( remove_afters. len ( ) ) ;
326321
327- let after_dependent = TaskDependentKey :: new ( DependentType :: After , task_name. to_string ( ) ) ;
328- let after_dependent_ident =
329- TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
330-
322+ let after_dependent_ident = TaskDependentIdent :: new_after ( & self . tenant , task_name) ;
331323 let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
332324 check_ops. push ( txn_cond_eq_seq (
333325 & after_dependent_ident,
@@ -341,10 +333,9 @@ impl TaskMgr {
341333 update_ops. push ( txn_put_pb ( & after_dependent_ident, & deps) ?) ;
342334 }
343335
344- let before_dependent_idents = remove_afters. iter ( ) . map ( |after| {
345- let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
346- TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
347- } ) ;
336+ let before_dependent_idents = remove_afters
337+ . iter ( )
338+ . map ( |after| TaskDependentIdent :: new_before ( & self . tenant , after) ) ;
348339 for ( before_dependent_ident, before_seq_deps) in
349340 self . kv_api . get_pb_vec ( before_dependent_idents) . await ?
350341 {
@@ -381,57 +372,33 @@ impl TaskMgr {
381372 let mut check_ops = Vec :: new ( ) ;
382373 let mut update_ops = Vec :: new ( ) ;
383374
384- let task_after_ident = TaskDependentIdent :: new_generic (
385- & self . tenant ,
386- TaskDependentKey :: new ( DependentType :: After , task_name. to_string ( ) ) ,
387- ) ;
388- if let Some ( task_after_dependent) = self . kv_api . get ( & task_after_ident) . await ? {
389- let target_idents = task_after_dependent. 0 . into_iter ( ) . map ( |dependent_target| {
390- let target_key =
391- TaskDependentKey :: new ( DependentType :: Before , dependent_target. clone ( ) ) ;
392- TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) )
393- } ) ;
394- for ( target_ident, seq_dep) in self . kv_api . get_pb_vec ( target_idents) . await ? {
395- check_ops. push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
375+ let task_after_ident = TaskDependentIdent :: new_after ( & self . tenant , task_name) ;
376+ let task_before_ident = TaskDependentIdent :: new_before ( & self . tenant , task_name) ;
396377
397- if let Some ( mut deps ) = seq_dep {
398- deps . 0 . remove ( task_name ) ;
399- update_ops . push ( txn_put_pb ( & target_ident , & deps ) ? ) ;
400- }
401- }
378+ let mut target_idents = Vec :: new ( ) ;
379+ if let Some ( task_after_dependent ) = self . kv_api . get ( & task_after_ident ) . await ? {
380+ target_idents . extend ( task_after_dependent . 0 . into_iter ( ) . map ( |dependent_target| {
381+ TaskDependentIdent :: new_before ( & self . tenant , dependent_target )
382+ } ) ) ;
402383 }
403- let task_before_ident = TaskDependentIdent :: new_generic (
404- & self . tenant ,
405- TaskDependentKey :: new ( DependentType :: Before , task_name. to_string ( ) ) ,
406- ) ;
407384 if let Some ( task_before_dependent) = self . kv_api . get ( & task_before_ident) . await ? {
408- let target_idents = task_before_dependent. 0 . into_iter ( ) . map ( |dependent_target| {
409- let target_key =
410- TaskDependentKey :: new ( DependentType :: After , dependent_target. clone ( ) ) ;
411- TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) )
412- } ) ;
413- for ( target_ident, seq_dep) in self . kv_api . get_pb_vec ( target_idents) . await ? {
414- check_ops. push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
385+ target_idents. extend ( task_before_dependent. 0 . into_iter ( ) . map ( |dependent_target| {
386+ TaskDependentIdent :: new_after ( & self . tenant , dependent_target)
387+ } ) ) ;
388+ }
389+ for ( target_ident, seq_dep) in self . kv_api . get_pb_vec ( target_idents) . await ? {
390+ check_ops. push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
415391
416- if let Some ( mut deps) = seq_dep {
417- deps. 0 . remove ( task_name) ;
418- update_ops. push ( txn_put_pb ( & target_ident, & deps) ?) ;
419- }
392+ if let Some ( mut deps) = seq_dep {
393+ deps. 0 . remove ( task_name) ;
394+ update_ops. push ( txn_put_pb ( & target_ident, & deps) ?) ;
420395 }
421396 }
422397 update_ops. push ( TxnOp :: delete (
423- TaskDependentIdent :: new_generic (
424- & self . tenant ,
425- TaskDependentKey :: new ( DependentType :: Before , task_name. to_string ( ) ) ,
426- )
427- . to_string_key ( ) ,
398+ TaskDependentIdent :: new_before ( & self . tenant , task_name) . to_string_key ( ) ,
428399 ) ) ;
429400 update_ops. push ( TxnOp :: delete (
430- TaskDependentIdent :: new_generic (
431- & self . tenant ,
432- TaskDependentKey :: new ( DependentType :: After , task_name. to_string ( ) ) ,
433- )
434- . to_string_key ( ) ,
401+ TaskDependentIdent :: new_after ( & self . tenant , task_name) . to_string_key ( ) ,
435402 ) ) ;
436403 update_ops. push ( TxnOp :: delete (
437404 TaskStateIdent :: new ( & self . tenant , task_name) . to_string_key ( ) ,
@@ -443,16 +410,36 @@ impl TaskMgr {
443410 Ok ( Ok ( ( ) ) )
444411 }
445412
413+ /// Marks the given task as succeeded, and checks all tasks that depend on it.
414+ ///
415+ /// For each task that depends on the completed task (`task_name`), we check if all its
416+ /// predecessor tasks are also succeeded. If so, we mark the dependent task as *not succeeded*
417+ /// to prevent premature execution. Otherwise, we record the dependent task as *ready*
418+ /// for further processing.
419+ ///
420+ /// # Arguments
421+ /// - `task_name`: The name of the task that has just completed successfully.
422+ ///
423+ /// # Returns
424+ /// - `Vec<String>`: A list of dependent task names that are ready to proceed.
425+ ///
426+ /// # Behavior
427+ /// 1. Retrieves all tasks that must be executed *before* the given `task_name`.
428+ /// 2. For each such task, find the tasks that depend on it (`after` tasks).
429+ /// 3. For each `after` task:
430+ /// - If all its dependencies (excluding the current task) are succeeded:
431+ /// - Mark that task as **not succeeded**.
432+ /// - Also mark the current task as succeeded.
433+ /// - Record it as ready for further processing.
434+ /// - Otherwise:
435+ /// - Still mark the current task as succeeded.
446436 #[ async_backtrace:: framed]
447437 #[ fastrace:: trace]
448- pub async fn task_succeeded (
438+ pub async fn get_next_ready_tasks (
449439 & self ,
450440 task_name : & str ,
451441 ) -> Result < Result < Vec < String > , TaskError > , TaskApiError > {
452- let task_before_ident = TaskDependentIdent :: new_generic (
453- & self . tenant ,
454- TaskDependentKey :: new ( DependentType :: Before , task_name. to_string ( ) ) ,
455- ) ;
442+ let task_before_ident = TaskDependentIdent :: new_before ( & self . tenant , task_name) ;
456443 let task_state_key = TaskStateIdent :: new ( & self . tenant , task_name) ;
457444 let succeeded_value = TaskState { is_succeeded : true } ;
458445 let not_succeeded_value = TaskState {
@@ -464,35 +451,31 @@ impl TaskMgr {
464451 } ;
465452 let mut ready_tasks = Vec :: new ( ) ;
466453
467- let target_after_idents = task_before_dependent. 0 . iter ( ) . map ( |before| {
468- let after_dependent = TaskDependentKey :: new ( DependentType :: After , before . clone ( ) ) ;
469- TaskDependentIdent :: new_generic ( & self . tenant , after_dependent )
470- } ) ;
454+ let target_after_idents = task_before_dependent
455+ . 0
456+ . iter ( )
457+ . map ( |before| TaskDependentIdent :: new_after ( & self . tenant , before ) ) ;
471458 for ( target_after_ident, target_after_dependent) in
472459 self . kv_api . get_pb_vec ( target_after_idents) . await ?
473460 {
474461 let Some ( target_after_dependent) = target_after_dependent else {
475462 continue ;
476463 } ;
477-
478464 let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] )
479465 . with_else ( vec ! [ txn_put_pb( & task_state_key, & succeeded_value) ?] ) ;
480466
481467 for target_after_task in target_after_dependent. 0 . iter ( ) {
482468 let task_ident = TaskStateIdent :: new ( & self . tenant , target_after_task) ;
483469 // Only care about the predecessors of this task's successor tasks, excluding this task itself.
484470 if target_after_task != task_name {
485- request =
486- request. push_if_then ( [ ] , [ txn_put_pb ( & task_ident, & not_succeeded_value) ?] ) ;
487- continue ;
488- }
489- request = request. push_if_then (
490- [ TxnCondition :: eq_value (
471+ request. condition . push ( TxnCondition :: eq_value (
491472 task_ident. to_string_key ( ) ,
492473 succeeded_value. to_pb ( ) ?. encode_to_vec ( ) ,
493- ) ] ,
494- [ txn_put_pb ( & task_ident, & not_succeeded_value) ?] ,
495- ) ;
474+ ) ) ;
475+ }
476+ request
477+ . if_then
478+ . push ( txn_put_pb ( & task_ident, & not_succeeded_value) ?) ;
496479 }
497480 let reply = self . kv_api . transaction ( request) . await ?;
498481
0 commit comments