@@ -27,7 +27,7 @@ use databend_common_meta_api::util::txn_put_pb;
2727use databend_common_meta_api:: SchemaApi ;
2828use databend_common_meta_app:: principal:: task;
2929use databend_common_meta_app:: principal:: task:: TaskMessage ;
30- use databend_common_meta_app:: principal:: task:: TaskState ;
30+ use databend_common_meta_app:: principal:: task:: TaskStateValue ;
3131use databend_common_meta_app:: principal:: task_dependent_ident:: TaskDependentIdent ;
3232use databend_common_meta_app:: principal:: task_message_ident:: TaskMessageIdent ;
3333use databend_common_meta_app:: principal:: task_state_ident:: TaskStateIdent ;
@@ -47,6 +47,7 @@ use databend_common_meta_types::TxnOp;
4747use databend_common_meta_types:: TxnRequest ;
4848use databend_common_meta_types:: With ;
4949use databend_common_proto_conv:: FromToProto ;
50+ use futures:: StreamExt ;
5051use futures:: TryStreamExt ;
5152use prost:: Message ;
5253use seq_marked:: SeqValue ;
@@ -266,37 +267,38 @@ impl TaskMgr {
266267 task_name : & str ,
267268 new_afters : & [ String ] ,
268269 ) -> Result < Result < ( ) , TaskError > , TaskApiError > {
269- let mut update_ops = Vec :: new ( ) ;
270- let mut check_ops = Vec :: with_capacity ( new_afters. len ( ) ) ;
271-
270+ let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] ) ;
272271 let after_dependent_ident = TaskDependentIdent :: new_after ( & self . tenant , task_name) ;
273272
274273 let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
275- check_ops . push ( txn_cond_eq_seq (
274+ request . condition . push ( txn_cond_eq_seq (
276275 & after_dependent_ident,
277276 after_seq_deps. seq ( ) ,
278277 ) ) ;
279278
280279 let mut after_deps = after_seq_deps. unwrap_or_default ( ) ;
281280 after_deps. 0 . extend ( new_afters. iter ( ) . cloned ( ) ) ;
282- update_ops. push ( txn_put_pb ( & after_dependent_ident, & after_deps) ?) ;
281+ request
282+ . if_then
283+ . push ( txn_put_pb ( & after_dependent_ident, & after_deps) ?) ;
283284
284285 let before_dependent_idents = new_afters
285286 . iter ( )
286287 . map ( |after| TaskDependentIdent :: new_before ( & self . tenant , after) ) ;
287288 for ( before_dependent_ident, before_seq_deps) in
288289 self . kv_api . get_pb_vec ( before_dependent_idents) . await ?
289290 {
290- check_ops . push ( txn_cond_eq_seq (
291+ request . condition . push ( txn_cond_eq_seq (
291292 & before_dependent_ident,
292293 before_seq_deps. seq ( ) ,
293294 ) ) ;
294295
295296 let mut deps = before_seq_deps. unwrap_or_default ( ) ;
296297 deps. 0 . insert ( task_name. to_string ( ) ) ;
297- update_ops. push ( txn_put_pb ( & before_dependent_ident, & deps) ?) ;
298+ request
299+ . if_then
300+ . push ( txn_put_pb ( & before_dependent_ident, & deps) ?) ;
298301 }
299- let request = TxnRequest :: new ( check_ops, update_ops) ;
300302 let reply = self . kv_api . transaction ( request) . await ?;
301303
302304 if !reply. success {
@@ -316,12 +318,11 @@ impl TaskMgr {
316318 task_name : & str ,
317319 remove_afters : & [ String ] ,
318320 ) -> Result < Result < ( ) , TaskError > , TaskApiError > {
319- let mut update_ops = Vec :: new ( ) ;
320- let mut check_ops = Vec :: with_capacity ( remove_afters. len ( ) ) ;
321+ let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] ) ;
321322
322323 let after_dependent_ident = TaskDependentIdent :: new_after ( & self . tenant , task_name) ;
323324 let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
324- check_ops . push ( txn_cond_eq_seq (
325+ request . condition . push ( txn_cond_eq_seq (
325326 & after_dependent_ident,
326327 after_seq_deps. seq ( ) ,
327328 ) ) ;
@@ -330,7 +331,9 @@ impl TaskMgr {
330331 for remove_after in remove_afters {
331332 deps. 0 . remove ( remove_after) ;
332333 }
333- update_ops. push ( txn_put_pb ( & after_dependent_ident, & deps) ?) ;
334+ request
335+ . if_then
336+ . push ( txn_put_pb ( & after_dependent_ident, & deps) ?) ;
334337 }
335338
336339 let before_dependent_idents = remove_afters
@@ -339,17 +342,18 @@ impl TaskMgr {
339342 for ( before_dependent_ident, before_seq_deps) in
340343 self . kv_api . get_pb_vec ( before_dependent_idents) . await ?
341344 {
342- check_ops . push ( txn_cond_eq_seq (
345+ request . condition . push ( txn_cond_eq_seq (
343346 & before_dependent_ident,
344347 before_seq_deps. seq ( ) ,
345348 ) ) ;
346349
347350 if let Some ( mut deps) = before_seq_deps {
348351 deps. 0 . remove ( task_name) ;
349- update_ops. push ( txn_put_pb ( & before_dependent_ident, & deps) ?) ;
352+ request
353+ . if_then
354+ . push ( txn_put_pb ( & before_dependent_ident, & deps) ?) ;
350355 }
351356 }
352- let request = TxnRequest :: new ( check_ops, update_ops) ;
353357 let reply = self . kv_api . transaction ( request) . await ?;
354358
355359 if !reply. success {
@@ -369,8 +373,7 @@ impl TaskMgr {
369373 & self ,
370374 task_name : & str ,
371375 ) -> Result < Result < ( ) , TaskError > , TaskApiError > {
372- let mut check_ops = Vec :: new ( ) ;
373- let mut update_ops = Vec :: new ( ) ;
376+ let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] ) ;
374377
375378 let task_after_ident = TaskDependentIdent :: new_after ( & self . tenant , task_name) ;
376379 let task_before_ident = TaskDependentIdent :: new_before ( & self . tenant , task_name) ;
@@ -387,24 +390,33 @@ impl TaskMgr {
387390 } ) ) ;
388391 }
389392 for ( target_ident, seq_dep) in self . kv_api . get_pb_vec ( target_idents) . await ? {
390- check_ops. push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
393+ request
394+ . condition
395+ . push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
391396
392397 if let Some ( mut deps) = seq_dep {
393398 deps. 0 . remove ( task_name) ;
394- update_ops . push ( txn_put_pb ( & target_ident, & deps) ?) ;
399+ request . if_then . push ( txn_put_pb ( & target_ident, & deps) ?) ;
395400 }
396401 }
397- update_ops . push ( TxnOp :: delete (
402+ request . if_then . push ( TxnOp :: delete (
398403 TaskDependentIdent :: new_before ( & self . tenant , task_name) . to_string_key ( ) ,
399404 ) ) ;
400- update_ops . push ( TxnOp :: delete (
405+ request . if_then . push ( TxnOp :: delete (
401406 TaskDependentIdent :: new_after ( & self . tenant , task_name) . to_string_key ( ) ,
402407 ) ) ;
403- update_ops. push ( TxnOp :: delete (
404- TaskStateIdent :: new ( & self . tenant , task_name) . to_string_key ( ) ,
405- ) ) ;
408+ let mut stream = self
409+ . kv_api
410+ . list_pb_keys ( & DirName :: new ( TaskStateIdent :: new (
411+ & self . tenant ,
412+ task_name,
413+ "" ,
414+ ) ) )
415+ . await ?;
406416
407- let request = TxnRequest :: new ( check_ops, update_ops) ;
417+ while let Some ( result) = stream. next ( ) . await {
418+ request. if_then . push ( TxnOp :: delete ( result?. to_string_key ( ) ) )
419+ }
408420 let _ = self . kv_api . transaction ( request) . await ?;
409421
410422 Ok ( Ok ( ( ) ) )
@@ -439,9 +451,8 @@ impl TaskMgr {
439451 task_name : & str ,
440452 ) -> Result < Result < Vec < String > , TaskError > , TaskApiError > {
441453 let task_before_ident = TaskDependentIdent :: new_before ( & self . tenant , task_name) ;
442- let task_state_key = TaskStateIdent :: new ( & self . tenant , task_name) ;
443- let succeeded_value = TaskState { is_succeeded : true } ;
444- let not_succeeded_value = TaskState {
454+ let succeeded_value = TaskStateValue { is_succeeded : true } ;
455+ let not_succeeded_value = TaskStateValue {
445456 is_succeeded : false ,
446457 } ;
447458
@@ -460,13 +471,19 @@ impl TaskMgr {
460471 let Some ( target_after_dependent) = target_after_dependent else {
461472 continue ;
462473 } ;
463- let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] )
464- . with_else ( vec ! [ txn_put_pb( & task_state_key, & succeeded_value) ?] ) ;
465-
466- for target_after_task in target_after_dependent. 0 . iter ( ) {
467- let task_ident = TaskStateIdent :: new ( & self . tenant , target_after_task) ;
474+ let target_after = & target_after_ident. name ( ) . source ;
475+ let this_task_to_target_state =
476+ TaskStateIdent :: new ( & self . tenant , task_name, target_after) ;
477+ let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] ) . with_else ( vec ! [ txn_put_pb(
478+ & this_task_to_target_state,
479+ & succeeded_value,
480+ ) ?] ) ;
481+
482+ for before_target_after in target_after_dependent. 0 . iter ( ) {
483+ let task_ident =
484+ TaskStateIdent :: new ( & self . tenant , before_target_after, target_after) ;
468485 // Only care about the predecessors of this task's successor tasks, excluding this task itself.
469- if target_after_task != task_name {
486+ if before_target_after != task_name {
470487 request. condition . push ( TxnCondition :: eq_value (
471488 task_ident. to_string_key ( ) ,
472489 succeeded_value. to_pb ( ) ?. encode_to_vec ( ) ,
@@ -485,15 +502,6 @@ impl TaskMgr {
485502 Ok ( Ok ( ready_tasks) )
486503 }
487504
488- #[ async_backtrace:: framed]
489- #[ fastrace:: trace]
490- pub async fn clean_task_state ( & self , task_name : & str ) -> Result < ( ) , TaskApiError > {
491- let key = TaskStateIdent :: new ( & self . tenant , task_name) ;
492- let req = UpsertPB :: delete ( key) . with ( MatchSeq :: GE ( 1 ) ) ;
493- let _ = self . kv_api . upsert_pb ( & req) . await ?;
494- Ok ( ( ) )
495- }
496-
497505 async fn create_task_inner (
498506 & self ,
499507 task : Task ,
0 commit comments