diff --git a/duckdb b/duckdb index 1986445..0b83e5d 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 19864453f7d0ed095256d848b46e7b8630989bac +Subproject commit 0b83e5d2f68bc02dfefde74b846bd039f078affa diff --git a/extension-ci-tools b/extension-ci-tools index 83f847f..929896d 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 83f847f8467a760f6c66dc7996c13300210220a8 +Subproject commit 929896d8132ed0a3b376722380b38ea023505bd4 diff --git a/src/cronjob_extension.cpp b/src/cronjob_extension.cpp index d80052f..188f303 100644 --- a/src/cronjob_extension.cpp +++ b/src/cronjob_extension.cpp @@ -6,10 +6,12 @@ #include "duckdb/function/scalar_function.hpp" #include "duckdb/function/table_function.hpp" #include "duckdb/main/extension_util.hpp" +#include "duckdb/main/client_config.hpp" #include "ccronexpr.h" #include #include #include +#include namespace duckdb { @@ -23,6 +25,7 @@ struct CronTask { std::vector> execution_history; std::mutex mutex; cron_expr expr; + std::unordered_map variables; // Store variables for this task }; class CronScheduler { @@ -42,7 +45,7 @@ class CronScheduler { } } - string AddTask(const string &query, const string &schedule) { + string AddTask(const string &query, const string &schedule, ClientContext &context) { // Parse cron expression cron_expr expr; const char* err = NULL; @@ -58,6 +61,12 @@ class CronScheduler { task->expr = expr; task->next_run = CalculateNextRun(task->expr); + // Capture current variables from the context + auto &config = ClientConfig::GetConfig(context); + for (const auto &var_pair : config.user_variables) { + task->variables[var_pair.first] = var_pair.second; + } + std::lock_guard lock(tasks_mutex); string task_id = task->id; tasks[task_id] = std::move(task); @@ -136,6 +145,21 @@ class CronScheduler { try { DuckDB db(db_instance); Connection conn(db); + + // Set variables before executing the query + for (const auto &var_pair : task.variables) { + string set_var_query = "SET VARIABLE " + var_pair.first + " = " + var_pair.second.ToSQLString(); + auto set_result = conn.Query(set_var_query); + if (set_result->HasError()) { + std::lock_guard task_lock(task.mutex); + task.execution_history.emplace_back( + std::chrono::system_clock::now(), + "Error setting variable " + var_pair.first + ": " + set_result->GetError() + ); + return; + } + } + auto result = conn.Query(task.query); std::lock_guard task_lock(task.mutex); @@ -225,13 +249,14 @@ static void CronScalarFunction(DataChunk &args, ExpressionState &state, Vector & auto &query_vector = args.data[0]; auto &schedule_vector = args.data[1]; + auto &context = state.GetContext(); UnaryExecutor::Execute( query_vector, result, args.size(), [&](string_t query) { try { auto schedule = schedule_vector.GetValue(0).ToString(); - auto task_id = scheduler->AddTask(query.GetString(), schedule); + auto task_id = scheduler->AddTask(query.GetString(), schedule, context); return StringVector::AddString(result, task_id); } catch (const Exception &e) { throw InvalidInputException("Error scheduling cron task: %s", e.what());