Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 5005 files
29 changes: 27 additions & 2 deletions src/cronjob_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
#include "duckdb/function/scalar_function.hpp"
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/extension_util.hpp"
#include "duckdb/main/client_config.hpp"
#include "ccronexpr.h"
#include <thread>
#include <mutex>
#include <chrono>
#include <unordered_map>

namespace duckdb {

Expand All @@ -23,6 +25,7 @@ struct CronTask {
std::vector<std::pair<std::chrono::system_clock::time_point, string>> execution_history;
std::mutex mutex;
cron_expr expr;
std::unordered_map<string, Value> variables; // Store variables for this task
};

class CronScheduler {
Expand All @@ -42,7 +45,7 @@ class CronScheduler {
}
}

string AddTask(const string &query, const string &schedule) {
string AddTask(const string &query, const string &schedule, ClientContext &context) {
// Parse cron expression
cron_expr expr;
const char* err = NULL;
Expand All @@ -58,6 +61,12 @@ class CronScheduler {
task->expr = expr;
task->next_run = CalculateNextRun(task->expr);

// Capture current variables from the context
auto &config = ClientConfig::GetConfig(context);
for (const auto &var_pair : config.user_variables) {
task->variables[var_pair.first] = var_pair.second;
}

std::lock_guard<std::mutex> lock(tasks_mutex);
string task_id = task->id;
tasks[task_id] = std::move(task);
Expand Down Expand Up @@ -136,6 +145,21 @@ class CronScheduler {
try {
DuckDB db(db_instance);
Connection conn(db);

// Set variables before executing the query
for (const auto &var_pair : task.variables) {
string set_var_query = "SET VARIABLE " + var_pair.first + " = " + var_pair.second.ToSQLString();
auto set_result = conn.Query(set_var_query);
if (set_result->HasError()) {
std::lock_guard<std::mutex> task_lock(task.mutex);
task.execution_history.emplace_back(
std::chrono::system_clock::now(),
"Error setting variable " + var_pair.first + ": " + set_result->GetError()
);
return;
}
}

auto result = conn.Query(task.query);

std::lock_guard<std::mutex> task_lock(task.mutex);
Expand Down Expand Up @@ -225,13 +249,14 @@ static void CronScalarFunction(DataChunk &args, ExpressionState &state, Vector &

auto &query_vector = args.data[0];
auto &schedule_vector = args.data[1];
auto &context = state.GetContext();

UnaryExecutor::Execute<string_t, string_t>(
query_vector, result, args.size(),
[&](string_t query) {
try {
auto schedule = schedule_vector.GetValue(0).ToString();
auto task_id = scheduler->AddTask(query.GetString(), schedule);
auto task_id = scheduler->AddTask(query.GetString(), schedule, context);
return StringVector::AddString(result, task_id);
} catch (const Exception &e) {
throw InvalidInputException("Error scheduling cron task: %s", e.what());
Expand Down
Loading