Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Once the reports are generated (SQL queries finish executing), it's natural for

## Concepts
#### Task
A task is a named SQL query that is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list, unless a specific database is specified in the API request (`db`). A `-- queue:` tag to always route the task to a particular queue, unless it's overriden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one.
A task is a named SQL query that is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list, unless a specific database is specified in the API request (`db`). A `-- queue:` tag to always route the task to a particular queue, unless it's overridden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one.

Example:
```sql
Expand All @@ -51,7 +51,7 @@ SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?;
SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?;
```

Here, when the server starts, the queries `get_profit_summary` and `get_profit_entries` are registered automatically as tasks. Internally, the server validates and prepares these SQL statements (unless `raw: 1`). `?` are MySQL value placholders. For Postgres, the placeholders are `$1, $2 ...`
Here, when the server starts, the queries `get_profit_summary` and `get_profit_entries` are registered automatically as tasks. Internally, the server validates and prepares these SQL statements (unless `raw: 1`). `?` are MySQL value placeholders. For Postgres, the placeholders are `$1, $2 ...`

#### Job
A job is an instance of a task that has been queued to run. Each job has an ID that can be used to track its status. If an ID is not passed explicitly, it is generated internally and returned. These IDs need not be unique, but only a single job with a certain ID can run at any given point. For the next job with the same ID to be scheduled, the previous job has to finish execution. Using non-unique IDs like this is useful in cases where users can be prevented from sending multiple requests for the same reports, like in our usecases.
Expand Down
16 changes: 8 additions & 8 deletions cmd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var reValidateName = regexp.MustCompile("(?i)^[a-z0-9-_:]+$")
// is passed, it returns the raw SQL bodies as well.
func handleGetTasksList(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)
)

tasks := co.GetTasks()
Expand All @@ -41,7 +41,7 @@ func handleGetTasksList(w http.ResponseWriter, r *http.Request) {
// handleGetJobStatus returns the status of a given jobID.
func handleGetJobStatus(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

jobID = chi.URLParam(r, "jobID")
)
Expand All @@ -59,7 +59,7 @@ func handleGetJobStatus(w http.ResponseWriter, r *http.Request) {
// handleGetGroupStatus returns the status of a given groupID.
func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

groupID = chi.URLParam(r, "groupID")
)
Expand All @@ -77,7 +77,7 @@ func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {
// handleGetPendingJobs returns pending jobs in a given queue.
func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)
queue = chi.URLParam(r, "queue")
)

Expand All @@ -94,7 +94,7 @@ func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) {
// handlePostJob creates a new job against a given task name.
func handlePostJob(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

taskName = chi.URLParam(r, "taskName")
)
Expand Down Expand Up @@ -134,7 +134,7 @@ func handlePostJob(w http.ResponseWriter, r *http.Request) {
// handlePostJobGroup creates multiple jobs under a group.
func handlePostJobGroup(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

decoder = json.NewDecoder(r.Body)
req models.GroupReq
Expand All @@ -160,7 +160,7 @@ func handlePostJobGroup(w http.ResponseWriter, r *http.Request) {
// it is cancelled first and then deleted.
func handleCancelJob(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

jobID = chi.URLParam(r, "jobID")
purge, _ = strconv.ParseBool(r.URL.Query().Get("purge"))
Expand All @@ -178,7 +178,7 @@ func handleCancelJob(w http.ResponseWriter, r *http.Request) {
// If the job is running, it is cancelled first, and then deleted.
func handleCancelGroupJob(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value(coreKey).(*core.Core)

groupID = chi.URLParam(r, "groupID")
purge, _ = strconv.ParseBool(r.URL.Query().Get("purge"))
Expand Down
32 changes: 24 additions & 8 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/zerodha/dungbeetle/v2/internal/resultbackends/sqldb"
)

type contextKey string

const coreKey contextKey = "core"

var (
//go:embed config.sample.toml
efs embed.FS
Expand All @@ -30,9 +34,19 @@ var (
func initFlags(ko *koanf.Koanf) {
// Command line flags.
f := flag.NewFlagSet("config", flag.ContinueOnError)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I feel is unnecessary. Let's leave the flags stdout untouched. The rest of the changes are fine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, Let’s keep flags stdout as is.

const usage = `
DungBeetle is a distributed SQL job queue and worker system.
It allows you to run SQL queries as jobs in a distributed manner, with support for job queues, concurrency, and result storage.

Usage:
dungbeetle [flags]
Flags:
`

f.Usage = func() {
lo.Info("DungBeetle")
lo.Info(f.FlagUsages())
fmt.Print(usage)
fmt.Println(f.FlagUsages())
os.Exit(0)
}

Expand All @@ -52,18 +66,20 @@ func initFlags(ko *koanf.Koanf) {
}

func initConfig(ko *koanf.Koanf) {
lo.Info("buildstring", "value", buildString)

// Generate new config file.
if ok := ko.Bool("new-config"); ok {
if err := generateConfig(); err != nil {
fmt.Println(err)
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
fmt.Println("config.toml generated. Edit and run --install.")
fmt.Fprintln(os.Stderr, "config.toml generated.")
os.Exit(0)
}

// Print build string.
lo.Info("buildstring", "value", buildString)

// Load the config file.
if err := ko.Load(file.Provider(ko.String("config")), toml.Parser()); err != nil {
slog.Error("error reading config", "error", err)
Expand Down Expand Up @@ -123,7 +139,7 @@ func initHTTP(co *core.Core) {
"content-length", r.ContentLength,
"form", r.Form,
)
ctx := context.WithValue(r.Context(), "core", co)
ctx := context.WithValue(r.Context(), coreKey, co)
next.ServeHTTP(w, r.WithContext(ctx))
})
})
Expand Down Expand Up @@ -199,10 +215,10 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) {
}

if v := ko.MustString("job_queue.broker.type"); v != "redis" {
return nil, fmt.Errorf("unsupported job_queue.broker.type '%s'. Only 'redis' is supported.", v)
return nil, fmt.Errorf("unsupported job_queue.broker.type '%s'. Only 'redis' is supported", v)
}
if v := ko.MustString("job_queue.state.type"); v != "redis" {
return nil, fmt.Errorf("unsupported job_queue.state.type '%s'. Only 'redis' is supported.", v)
return nil, fmt.Errorf("unsupported job_queue.state.type '%s'. Only 'redis' is supported", v)
}

lo := slog.Default()
Expand Down
2 changes: 1 addition & 1 deletion internal/core/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (co *Core) loadTasks(dir string) (Tasks, error) {

// A map of DBs are attached to every query. This can be
// DBs tagged specifically to queries in the SQL file,
// or will be the map of all avaliable DBs. During execution
// or will be the map of all available DBs. During execution
// one of these DBs will be picked randomly.
srcDBs dbpool.Pool

Expand Down
2 changes: 1 addition & 1 deletion internal/dbpool/dbpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(mp map[string]Config) (map[string]*sql.DB, error) {
return out, nil
}

// connectDB creates and returns a database connection.
// NewConn creates and returns a database connection.
func NewConn(cfg Config) (*sql.DB, error) {
db, err := sql.Open(cfg.Type, cfg.DSN)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/resultbackends/sqldb/sqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type SqlDB struct {

// The result schemas (CREATE TABLE ...) are dynamically
// generated everytime queries are executed based on their result columns.
// They're cached here so as to avoid repetetive generation.
// They're cached here so as to avoid repetitive generation.
resTableSchemas map[string]insertSchema
schemaMutex sync.RWMutex
}
Expand Down