Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/test-bot/commandkit.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export default defineConfig({
devtools(),
cache(),
ai(),
tasks(),
tasks({
initializeDefaultDriver: true,
sqliteDriverDatabasePath: './tasks.db',
}),
],
});
4 changes: 0 additions & 4 deletions apps/test-bot/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { Client, Partials } from 'discord.js';
import { Logger, commandkit } from 'commandkit';
import { setDriver } from '@commandkit/tasks';
import { SQLiteDriver } from '@commandkit/tasks/sqlite';
import config from './config.json' with { type: 'json' };

const client = new Client({
Expand All @@ -16,8 +14,6 @@ const client = new Client({
partials: [Partials.Channel, Partials.Message, Partials.User],
});

setDriver(new SQLiteDriver('./tasks.db'));

Logger.log('Application bootstrapped successfully!');

commandkit.setPrefixResolver((message) => {
Expand Down
11 changes: 11 additions & 0 deletions apps/test-bot/src/app/tasks/current-time.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { task } from '@commandkit/tasks';
import { Logger } from 'commandkit';

export default task({
name: 'current-time',
immediate: true,
schedule: '*/10 * * * * *', // every 10 seconds
async execute() {
Logger.info(`The current time is ${new Date().toLocaleString()}`);
},
});
18 changes: 15 additions & 3 deletions apps/website/docs/api-reference/tasks/classes/sqlite-driver.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import MemberDescription from '@site/src/components/MemberDescription';

## SQLiteDriver

<GenerationInfo sourceFile="packages/tasks/src/drivers/sqlite.ts" sourceLine="20" packageName="@commandkit/tasks" />
<GenerationInfo sourceFile="packages/tasks/src/drivers/sqlite.ts" sourceLine="21" packageName="@commandkit/tasks" />

SQLite-based persistent job queue manager for CommandKit tasks.

Expand All @@ -35,7 +35,9 @@ setDriver(driver);

```ts title="Signature"
class SQLiteDriver implements TaskDriver {
constructor(dbPath: = './commandkit-tasks.db')
constructor(dbPath: = './commandkit-tasks.db', pollingInterval: = 5_000)
getPollingInterval() => ;
setPollingInterval(pollingInterval: number) => ;
destroy() => ;
create(task: TaskData) => Promise<string>;
delete(identifier: string) => Promise<void>;
Expand All @@ -50,9 +52,19 @@ class SQLiteDriver implements TaskDriver {

### constructor

<MemberInfo kind="method" type={`(dbPath: = './commandkit-tasks.db') => SQLiteDriver`} />
<MemberInfo kind="method" type={`(dbPath: = './commandkit-tasks.db', pollingInterval: = 5_000) => SQLiteDriver`} />

Create a new SQLiteDriver instance.
### getPollingInterval

<MemberInfo kind="method" type={`() => `} />

Get the polling interval.
### setPollingInterval

<MemberInfo kind="method" type={`(pollingInterval: number) => `} />

Set the polling interval.
### destroy

<MemberInfo kind="method" type={`() => `} />
Expand Down
2 changes: 1 addition & 1 deletion apps/website/docs/api-reference/tasks/classes/task.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { task } from '@commandkit/tasks';

export default task({
name: 'cleanup-old-data',
schedule: { type: 'cron', value: '0 2 * * *' }, // Daily at 2 AM
schedule: '0 2 * * *', // Daily at 2 AM
async prepare(ctx) {
// Only run if there's old data to clean
return await hasOldData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import MemberDescription from '@site/src/components/MemberDescription';

## TasksPlugin

<GenerationInfo sourceFile="packages/tasks/src/plugin.ts" sourceLine="54" packageName="@commandkit/tasks" />
<GenerationInfo sourceFile="packages/tasks/src/plugin.ts" sourceLine="66" packageName="@commandkit/tasks" />

CommandKit plugin that provides task management capabilities.

Expand Down
4 changes: 2 additions & 2 deletions apps/website/docs/api-reference/tasks/functions/task.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import MemberDescription from '@site/src/components/MemberDescription';

## task

<GenerationInfo sourceFile="packages/tasks/src/task.ts" sourceLine="139" packageName="@commandkit/tasks" />
<GenerationInfo sourceFile="packages/tasks/src/task.ts" sourceLine="140" packageName="@commandkit/tasks" />

Creates a new task definition.

Expand All @@ -31,7 +31,7 @@ import { task } from '@commandkit/tasks';
// Simple scheduled task
export default task({
name: 'daily-backup',
schedule: { type: 'cron', value: '0 0 * * *' },
schedule: '0 0 * * *',
async execute(ctx) {
await performBackup();
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Future versions may support customizing the tasks directory path and HMR behavio
```ts title="Signature"
interface TasksPluginOptions {
initializeDefaultDriver?: boolean;
sqliteDriverPollingInterval?: number;
sqliteDriverDatabasePath?: string;
}
```

Expand All @@ -36,6 +38,18 @@ Whether to initialize the default driver.

If true, the plugin will initialize the default driver.
If false, the plugin will not initialize the default driver.
### sqliteDriverPollingInterval

<MemberInfo kind="property" type={`number`} default={`5_000`} />

The polling interval for the default sqlite driver.
Default is 5_000.
### sqliteDriverDatabasePath

<MemberInfo kind="property" type={`string`} default={`'./<a href='/docs/api-reference/commandkit/variables/commandkit#commandkit'>commandkit</a>-<a href='/docs/api-reference/tasks/functions/tasks#tasks'>tasks</a>.db'`} />

The path to the sqlite database file for the default sqlite driver.
Default is './commandkit-tasks.db' but `:memory:` can be used for an in-memory database.


</div>
57 changes: 50 additions & 7 deletions packages/tasks/src/drivers/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { TaskDriver, TaskRunner } from '../driver';
import { TaskData } from '../types';
import { DatabaseSync, StatementSync } from 'node:sqlite';
import cronParser from 'cron-parser';
import { defer } from 'commandkit';

/**
* SQLite-based persistent job queue manager for CommandKit tasks.
Expand All @@ -28,17 +29,39 @@ export class SQLiteDriver implements TaskDriver {
delete: StatementSync;
updateNextRun: StatementSync;
updateCompleted: StatementSync;
findCronByName: StatementSync;
deleteByName: StatementSync;
};

/**
* Create a new SQLiteDriver instance.
* @param dbPath Path to the SQLite database file (default: './commandkit-tasks.db'). Use `:memory:` for an in-memory database.
* @param pollingInterval The interval in milliseconds to poll for jobs (default: 5_000).
*/
constructor(dbPath = './commandkit-tasks.db') {
constructor(
dbPath = './commandkit-tasks.db',
private pollingInterval = 5_000,
) {
this.db = new DatabaseSync(dbPath, { open: true });
this.init();
}

/**
* Get the polling interval.
* @returns The polling interval in milliseconds.
*/
public getPollingInterval() {
return this.pollingInterval;
}

/**
* Set the polling interval.
* @param pollingInterval The interval in milliseconds to poll for jobs.
*/
public setPollingInterval(pollingInterval: number) {
this.pollingInterval = pollingInterval;
}

/**
* Destroy the SQLite driver and stop the polling loop.
*/
Expand Down Expand Up @@ -81,6 +104,12 @@ export class SQLiteDriver implements TaskDriver {
updateCompleted: this.db.prepare(
/* sql */ `UPDATE jobs SET status = 'completed', last_run = ? WHERE id = ?`,
),
findCronByName: this.db.prepare(
/* sql */ `SELECT id FROM jobs WHERE name = ? AND schedule_type = 'cron' AND status = 'pending'`,
),
deleteByName: this.db.prepare(
/* sql */ `DELETE FROM jobs WHERE name = ? AND schedule_type = 'cron'`,
),
};

this.startPolling();
Expand Down Expand Up @@ -110,6 +139,15 @@ export class SQLiteDriver implements TaskDriver {
nextRun = typeof schedule === 'number' ? schedule : schedule.getTime();
}

if (scheduleType === 'cron') {
const existingTask = this.statements.findCronByName.get(name) as
| { id: number }
| undefined;
if (existingTask) {
this.statements.deleteByName.run(name);
}
}

const result = this.statements.insert.run(
name,
JSON.stringify(data ?? {}),
Expand All @@ -120,11 +158,13 @@ export class SQLiteDriver implements TaskDriver {
Date.now(),
);

if (task.immediate) {
await this.runner?.({
name,
data,
timestamp: Date.now(),
if (task.immediate && scheduleType === 'cron') {
defer(() => {
return this.runner?.({
name,
data,
timestamp: Date.now(),
});
});
}

Expand Down Expand Up @@ -153,7 +193,10 @@ export class SQLiteDriver implements TaskDriver {
*/
private startPolling() {
if (this.interval) clearInterval(this.interval);
this.interval = setInterval(() => this.pollJobs(), 1000).unref();
this.interval = setInterval(
() => this.pollJobs(),
this.pollingInterval,
).unref();
// Run immediately on startup
this.pollJobs();
}
Expand Down
35 changes: 32 additions & 3 deletions packages/tasks/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ export interface TasksPluginOptions {
* @default true
*/
initializeDefaultDriver?: boolean;
/**
* The polling interval for the default sqlite driver.
* Default is 5_000.
* @default 5_000
*/
sqliteDriverPollingInterval?: number;
/**
* The path to the sqlite database file for the default sqlite driver.
* Default is './commandkit-tasks.db' but `:memory:` can be used for an in-memory database.
* @default './commandkit-tasks.db'
*/
sqliteDriverDatabasePath?: string;
}

/**
Expand Down Expand Up @@ -74,7 +86,12 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
const { SQLiteDriver } =
require('./drivers/sqlite') as typeof import('./drivers/sqlite');

taskDriverManager.setDriver(new SQLiteDriver());
taskDriverManager.setDriver(
new SQLiteDriver(
this.options.sqliteDriverDatabasePath ?? './commandkit-tasks.db',
this.options.sqliteDriverPollingInterval ?? 5_000,
),
);
} catch (e: any) {
Logger.error(
`Failed to initialize the default driver for tasks plugin: ${e?.stack || e}`,
Expand Down Expand Up @@ -182,6 +199,8 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
name: task.name,
data: {},
schedule: task.schedule,
immediate: task.immediate,
timezone: task.timezone,
});
}

Expand Down Expand Up @@ -225,14 +244,22 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
if (!taskData || !(taskData instanceof Task)) return;

if (this.tasks.has(taskData.name)) {
Logger.info(`Reloading task: ${taskData.name}`);
await taskDriverManager.deleteTask(taskData.name);
if (taskData.isCron()) {
Logger.info(`Replacing cron task: ${taskData.name}`);
// For cron tasks, the SQLiteDriver.create() method will handle the replacement
// No need to manually delete the existing task
} else {
Logger.info(`Reloading task: ${taskData.name}`);
await taskDriverManager.deleteTask(taskData.name);
}
this.tasks.set(taskData.name, taskData);
if (taskData.schedule) {
await taskDriverManager.createTask({
name: taskData.name,
data: {},
schedule: taskData.schedule,
immediate: taskData.immediate,
timezone: taskData.timezone,
});
}
} else {
Expand All @@ -243,6 +270,8 @@ export class TasksPlugin extends RuntimePlugin<TasksPluginOptions> {
name: taskData.name,
data: {},
schedule: taskData.schedule,
immediate: taskData.immediate,
timezone: taskData.timezone,
});
}
}
Expand Down
9 changes: 5 additions & 4 deletions packages/tasks/src/task.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TaskContext } from './context';
import { TaskDefinition, TaskSchedule } from './types';
import { TaskData, TaskDefinition, TaskSchedule } from './types';

/**
* Represents a task instance with execution logic and metadata.
Expand All @@ -14,7 +14,7 @@ import { TaskDefinition, TaskSchedule } from './types';
*
* export default task({
* name: 'cleanup-old-data',
* schedule: { type: 'cron', value: '0 2 * * *' }, // Daily at 2 AM
* schedule: '0 2 * * *', // Daily at 2 AM
* async prepare(ctx) {
* // Only run if there's old data to clean
* return await hasOldData();
Expand All @@ -40,7 +40,8 @@ export class Task<T extends Record<string, any> = Record<string, any>> {
* Only applicable to cron tasks, defaults to false.
*/
public get immediate(): boolean {
return this.data.immediate ?? false;
if (this.isCron()) return !!this.data.immediate;
return false;
}

/**
Expand Down Expand Up @@ -126,7 +127,7 @@ export class Task<T extends Record<string, any> = Record<string, any>> {
* // Simple scheduled task
* export default task({
* name: 'daily-backup',
* schedule: { type: 'cron', value: '0 0 * * *' },
* schedule: '0 0 * * *',
* async execute(ctx) {
* await performBackup();
* },
Expand Down