Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/meta/app/src/principal/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub enum TaskMessage {
// Schedule Task will try to spawn a thread in Query to continue running according to the time set in schedule
ScheduleTask(Task),
// Delete the task information and try to cancel the scheduled task in the query.
DeleteTask(String),
DeleteTask(String, Option<WarehouseOptions>),
// After Task will bind Task to the tasks in Task.afters.
// When Execute Task is executed, after all the after tasks of Task are completed,
// the execution will continue.
Expand All @@ -123,15 +123,15 @@ impl TaskMessage {
TaskMessage::ExecuteTask(task)
| TaskMessage::ScheduleTask(task)
| TaskMessage::AfterTask(task) => task.task_name.as_str(),
TaskMessage::DeleteTask(task_name) => task_name.as_str(),
TaskMessage::DeleteTask(task_name, _) => task_name.as_str(),
}
}

pub fn ty(&self) -> TaskMessageType {
match self {
TaskMessage::ExecuteTask(_) => TaskMessageType::Execute,
TaskMessage::ScheduleTask(_) => TaskMessageType::Schedule,
TaskMessage::DeleteTask(_) => TaskMessageType::Delete,
TaskMessage::DeleteTask(_, _) => TaskMessageType::Delete,
TaskMessage::AfterTask(_) => TaskMessageType::After,
}
}
Expand Down Expand Up @@ -162,4 +162,13 @@ impl TaskMessage {
pub fn prefix_range() -> (i64, i64) {
(0, 1)
}

pub fn warehouse_options(&self) -> Option<&WarehouseOptions> {
match self {
TaskMessage::ExecuteTask(task)
| TaskMessage::ScheduleTask(task)
| TaskMessage::AfterTask(task) => task.warehouse_options.as_ref(),
TaskMessage::DeleteTask(_, warehouse_options) => warehouse_options.as_ref(),
}
}
}
23 changes: 21 additions & 2 deletions src/meta/proto-conv/src/task_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use chrono::Utc;
use databend_common_meta_app::principal as mt;
use databend_common_meta_app::principal::task::Status;
use databend_common_protos::pb;
use databend_common_protos::pb::task_message::DeleteTask;
use databend_common_protos::pb::task_message::Message;

use crate::reader_check_msg;
Expand Down Expand Up @@ -155,7 +156,17 @@ impl FromToProto for mt::TaskMessage {
Message::ScheduleTask(task) => {
mt::TaskMessage::ScheduleTask(mt::Task::from_pb(task)?)
}
Message::DeleteTask(task_name) => mt::TaskMessage::DeleteTask(task_name),
Message::DeleteTask(task_name) => mt::TaskMessage::DeleteTask(task_name, None),
Message::DeleteTaskV2(DeleteTask {
task_name,
warehouse_options,
}) => {
let warehouse = warehouse_options.as_ref().map(|w| mt::WarehouseOptions {
warehouse: w.warehouse.clone(),
using_warehouse_size: w.using_warehouse_size.clone(),
});
mt::TaskMessage::DeleteTask(task_name, warehouse)
}
Message::AfterTask(task) => mt::TaskMessage::AfterTask(mt::Task::from_pb(task)?),
},
})
Expand All @@ -165,7 +176,15 @@ impl FromToProto for mt::TaskMessage {
let message = match self {
mt::TaskMessage::ExecuteTask(task) => Message::ExecuteTask(task.to_pb()?),
mt::TaskMessage::ScheduleTask(task) => Message::ScheduleTask(task.to_pb()?),
mt::TaskMessage::DeleteTask(task_name) => Message::DeleteTask(task_name.clone()),
mt::TaskMessage::DeleteTask(task_name, warehouse_options) => {
Message::DeleteTaskV2(DeleteTask {
task_name: task_name.clone(),
warehouse_options: warehouse_options.as_ref().map(|w| pb::WarehouseOptions {
warehouse: w.warehouse.clone(),
using_warehouse_size: w.using_warehouse_size.clone(),
}),
})
}
mt::TaskMessage::AfterTask(task) => Message::AfterTask(task.to_pb()?),
};

Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(137, "2025-07-22: Add: GrantConnectionObject and UserPrivilegeType AccessConnection, AccessConnection"),
(138, "2025-07-23: Add: TableStatistics add index size"),
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,4 @@ mod v136_add_task;
mod v137_add_grant_object_connection;
mod v138_table_statistics;
mod v139_add_grant_ownership_object_sequence;
mod v140_task_message;
2 changes: 1 addition & 1 deletion src/meta/proto-conv/tests/it/v136_add_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ fn test_decode_v136_task_message() -> anyhow::Result<()> {
vec![26, 6, 116, 97, 115, 107, 95, 99, 160, 6, 136, 1, 168, 6, 24];
let want_delete = || {
let task = want_task();
mt::TaskMessage::DeleteTask(task.task_name)
mt::TaskMessage::DeleteTask(task.task_name, None)
};

common::test_pb_from_to(func_name!(), want_delete())?;
Expand Down
156 changes: 156 additions & 0 deletions src/meta/proto-conv/tests/it/v140_task_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::DateTime;
use databend_common_meta_app::principal as mt;
use databend_common_meta_app::principal::ScheduleOptions;
use databend_common_meta_app::principal::ScheduleType;
use databend_common_meta_app::principal::WarehouseOptions;
use fastrace::func_name;
use maplit::btreemap;

use crate::common;

#[test]
fn test_decode_v140_task_message() -> anyhow::Result<()> {
let want_task = || mt::Task {
task_id: 11,
task_name: "task_c".to_string(),
query_text: "SELECT * FROM t1".to_string(),
when_condition: Some("c1 > 1".to_string()),
after: vec!["task_a".to_string(), "task_b".to_string()],
comment: Some("comment".to_string()),
owner: "public".to_string(),
owner_user: "me".to_string(),
schedule_options: Some(ScheduleOptions {
interval: Some(11),
cron: Some("30 12 * * *".to_string()),
time_zone: Some("UTC".to_string()),
schedule_type: ScheduleType::IntervalType,
milliseconds_interval: Some(11),
}),
warehouse_options: Some(WarehouseOptions {
warehouse: Some("warehouse_a".to_string()),
using_warehouse_size: Some("10".to_string()),
}),
next_scheduled_at: Some(DateTime::from_timestamp(10, 0).unwrap()),
suspend_task_after_num_failures: Some(10),
error_integration: None,
status: mt::Status::Suspended,
created_at: DateTime::from_timestamp(11, 0).unwrap(),
updated_at: DateTime::from_timestamp(12, 0).unwrap(),
last_suspended_at: Some(DateTime::from_timestamp(13, 0).unwrap()),
session_params: btreemap! { s("a") => s("b") },
};

{
let task_message_execute_v140 = vec![
10, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
109, 101, 160, 6, 140, 1, 168, 6, 24, 160, 6, 140, 1, 168, 6, 24,
];
let want_execute = || mt::TaskMessage::ExecuteTask(want_task());

common::test_pb_from_to(func_name!(), want_execute())?;
common::test_load_old(
func_name!(),
task_message_execute_v140.as_slice(),
140,
want_execute(),
)?;
}
{
let task_message_schedule_v140 = vec![
18, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
109, 101, 160, 6, 140, 1, 168, 6, 24, 160, 6, 140, 1, 168, 6, 24,
];
let want_schedule = || mt::TaskMessage::ScheduleTask(want_task());

common::test_pb_from_to(func_name!(), want_schedule())?;
common::test_load_old(
func_name!(),
task_message_schedule_v140.as_slice(),
140,
want_schedule(),
)?;
}
{
let task_message_after_v140 = vec![
34, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
109, 101, 160, 6, 140, 1, 168, 6, 24, 160, 6, 140, 1, 168, 6, 24,
];
let want_after = || mt::TaskMessage::AfterTask(want_task());

common::test_pb_from_to(func_name!(), want_after())?;
common::test_load_old(
func_name!(),
task_message_after_v140.as_slice(),
140,
want_after(),
)?;
}
{
let task_message_delete_v140 = vec![
42, 27, 10, 6, 116, 97, 115, 107, 95, 99, 18, 17, 10, 11, 119, 97, 114, 101, 104, 111,
117, 115, 101, 95, 97, 18, 2, 49, 48, 160, 6, 140, 1, 168, 6, 24,
];
let want_delete = || {
let task = want_task();
mt::TaskMessage::DeleteTask(task.task_name, task.warehouse_options)
};

common::test_pb_from_to(func_name!(), want_delete())?;
common::test_load_old(
func_name!(),
task_message_delete_v140.as_slice(),
140,
want_delete(),
)?;
}

Ok(())
}

fn s(ss: impl ToString) -> String {
ss.to_string()
}
6 changes: 6 additions & 0 deletions src/meta/protos/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@ message TaskMessage {
uint64 ver = 100;
uint64 min_reader_ver = 101;

message DeleteTask {
string task_name = 1;
WarehouseOptions warehouse_options = 2;
}

oneof message {
Task execute_task = 1;
Task schedule_task = 2;
string delete_task = 3;
Task after_task = 4;
DeleteTask delete_task_v2 = 5;
}
}

Expand Down
13 changes: 10 additions & 3 deletions src/query/management/src/task/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,17 @@ impl TaskMgr {
let req = UpsertPB::delete(key).with(MatchSeq::GE(1));
let res = self.kv_api.upsert_pb(&req).await?;

self.send(TaskMessage::DeleteTask(task_name.to_string()))
.await?;
if res.is_changed() {
Ok(res.prev.as_ref().map(|prev| Task::clone(prev)))
let Some(task) = res.prev.as_ref().map(|prev| Task::clone(prev)) else {
return Ok(None);
};
self.send(TaskMessage::DeleteTask(
task_name.to_string(),
task.warehouse_options.clone(),
))
.await?;

Ok(Some(task))
} else {
Ok(None)
}
Expand Down
24 changes: 20 additions & 4 deletions src/query/service/src/task/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_ast::ast::AlterTaskOptions;
use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::table_context::TableContext;
use databend_common_config::GlobalConfig;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
Expand Down Expand Up @@ -202,6 +203,22 @@ impl TaskService {
while let Some(result) = steam.next().await {
let (_, task_message) = result?;
let task_key = TaskMessageIdent::new(tenant, task_message.key());

if let Some(WarehouseOptions {
warehouse: Some(warehouse),
..
}) = task_message.warehouse_options()
{
if warehouse
!= &self
.create_context(None)
.await?
.get_cluster()
.get_warehouse_id()?
{
continue;
}
}
match task_message {
// ScheduleTask is always monitored by all Query nodes, and ExecuteTask is sent serially to avoid repeated sending.
TaskMessage::ScheduleTask(mut task) => {
Expand Down Expand Up @@ -271,7 +288,7 @@ impl TaskService {
let Some(_guard) = fn_lock(&task_service, &task_key, duration.as_millis() as u64).await? else {
continue;
};
if !Self::check_when(&task, &owner, &task_service).await.unwrap() {
if !Self::check_when(&task, &owner, &task_service).await? {
continue;
}
fn_new_task_run(&task_service, &task).await?;
Expand Down Expand Up @@ -438,7 +455,7 @@ impl TaskService {
None,
)?;
}
TaskMessage::DeleteTask(task_name) => {
TaskMessage::DeleteTask(task_name, _) => {
if let Some(token) = scheduled_tasks.remove(&task_name) {
token.cancel();
}
Expand Down Expand Up @@ -529,8 +546,7 @@ impl TaskService {
};
let result = task_service
.execute_sql(Some(user.clone()), &format!("SELECT {when_condition}"))
.await
.unwrap();
.await?;
Ok(result
.first()
.and_then(|block| block.get_by_offset(0).index(0))
Expand Down
4 changes: 1 addition & 3 deletions src/query/sql/src/planner/binder/ddl/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ impl Binder {
sql,
session_parameters,
} = stmt;
if (schedule_opts.is_none() && after.is_empty())
|| (schedule_opts.is_some() && !after.is_empty())
{
if schedule_opts.is_some() && !after.is_empty() {
return Err(ErrorCode::SyntaxException(
"task must be defined with either given time schedule as a root task or run after other task as a DAG".to_string(),
));
Expand Down
Loading
Loading