From 2e1123f813508302d1bcd8835fa6775f543872c5 Mon Sep 17 00:00:00 2001 From: ganesh Date: Mon, 4 Aug 2025 21:54:45 +0530 Subject: [PATCH 1/2] Fix: Docs, Context Issue & Improved CLI Output. --- cmd/http.go | 16 ++++++++-------- cmd/init.go | 34 ++++++++++++++++++++++++++-------- internal/dbpool/dbpool.go | 2 +- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/cmd/http.go b/cmd/http.go index e1d9737..3fc7fb1 100644 --- a/cmd/http.go +++ b/cmd/http.go @@ -18,7 +18,7 @@ var reValidateName = regexp.MustCompile("(?i)^[a-z0-9-_:]+$") // is passed, it returns the raw SQL bodies as well. func handleGetTasksList(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) ) tasks := co.GetTasks() @@ -41,7 +41,7 @@ func handleGetTasksList(w http.ResponseWriter, r *http.Request) { // handleGetJobStatus returns the status of a given jobID. func handleGetJobStatus(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) jobID = chi.URLParam(r, "jobID") ) @@ -59,7 +59,7 @@ func handleGetJobStatus(w http.ResponseWriter, r *http.Request) { // handleGetGroupStatus returns the status of a given groupID. func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) groupID = chi.URLParam(r, "groupID") ) @@ -77,7 +77,7 @@ func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) { // handleGetPendingJobs returns pending jobs in a given queue. func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) queue = chi.URLParam(r, "queue") ) @@ -94,7 +94,7 @@ func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) { // handlePostJob creates a new job against a given task name. func handlePostJob(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) taskName = chi.URLParam(r, "taskName") ) @@ -134,7 +134,7 @@ func handlePostJob(w http.ResponseWriter, r *http.Request) { // handlePostJobGroup creates multiple jobs under a group. func handlePostJobGroup(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) decoder = json.NewDecoder(r.Body) req models.GroupReq @@ -160,7 +160,7 @@ func handlePostJobGroup(w http.ResponseWriter, r *http.Request) { // it is cancelled first and then deleted. func handleCancelJob(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) jobID = chi.URLParam(r, "jobID") purge, _ = strconv.ParseBool(r.URL.Query().Get("purge")) @@ -178,7 +178,7 @@ func handleCancelJob(w http.ResponseWriter, r *http.Request) { // If the job is running, it is cancelled first, and then deleted. func handleCancelGroupJob(w http.ResponseWriter, r *http.Request) { var ( - co = r.Context().Value("core").(*core.Core) + co = r.Context().Value(coreKey).(*core.Core) groupID = chi.URLParam(r, "groupID") purge, _ = strconv.ParseBool(r.URL.Query().Get("purge")) diff --git a/cmd/init.go b/cmd/init.go index f4e4c5f..0a27dcb 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -22,6 +22,10 @@ import ( "github.com/zerodha/dungbeetle/v2/internal/resultbackends/sqldb" ) +type contextKey string + +const coreKey contextKey = "core" + var ( //go:embed config.sample.toml efs embed.FS @@ -30,9 +34,21 @@ var ( func initFlags(ko *koanf.Koanf) { // Command line flags. f := flag.NewFlagSet("config", flag.ContinueOnError) + + const usage = ` +DungBeetle is a distributed SQL job queue and worker system. +It allows you to run SQL queries as jobs in a distributed manner, with support for job queues, concurrency, and result storage. + +Usage: + dungbeetle [flags] +Flags: +` + f.Usage = func() { - lo.Info("DungBeetle") - lo.Info(f.FlagUsages()) + fmt.Fprint(os.Stderr, usage) + f.SetOutput(os.Stderr) + f.PrintDefaults() + fmt.Fprintln(os.Stderr) os.Exit(0) } @@ -52,18 +68,20 @@ func initFlags(ko *koanf.Koanf) { } func initConfig(ko *koanf.Koanf) { - lo.Info("buildstring", "value", buildString) // Generate new config file. if ok := ko.Bool("new-config"); ok { if err := generateConfig(); err != nil { - fmt.Println(err) + fmt.Fprintln(os.Stderr, err) os.Exit(1) } - fmt.Println("config.toml generated. Edit and run --install.") + fmt.Fprintln(os.Stderr, "config.toml generated.") os.Exit(0) } + // Print build string. + lo.Info("buildstring", "value", buildString) + // Load the config file. if err := ko.Load(file.Provider(ko.String("config")), toml.Parser()); err != nil { slog.Error("error reading config", "error", err) @@ -123,7 +141,7 @@ func initHTTP(co *core.Core) { "content-length", r.ContentLength, "form", r.Form, ) - ctx := context.WithValue(r.Context(), "core", co) + ctx := context.WithValue(r.Context(), coreKey, co) next.ServeHTTP(w, r.WithContext(ctx)) }) }) @@ -199,10 +217,10 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) { } if v := ko.MustString("job_queue.broker.type"); v != "redis" { - return nil, fmt.Errorf("unsupported job_queue.broker.type '%s'. Only 'redis' is supported.", v) + return nil, fmt.Errorf("unsupported job_queue.broker.type '%s'. Only 'redis' is supported", v) } if v := ko.MustString("job_queue.state.type"); v != "redis" { - return nil, fmt.Errorf("unsupported job_queue.state.type '%s'. Only 'redis' is supported.", v) + return nil, fmt.Errorf("unsupported job_queue.state.type '%s'. Only 'redis' is supported", v) } lo := slog.Default() diff --git a/internal/dbpool/dbpool.go b/internal/dbpool/dbpool.go index e690dd6..4de9f55 100644 --- a/internal/dbpool/dbpool.go +++ b/internal/dbpool/dbpool.go @@ -39,7 +39,7 @@ func New(mp map[string]Config) (map[string]*sql.DB, error) { return out, nil } -// connectDB creates and returns a database connection. +// NewConn creates and returns a database connection. func NewConn(cfg Config) (*sql.DB, error) { db, err := sql.Open(cfg.Type, cfg.DSN) if err != nil { From 1597831c53fa180605ac108489b4dab8b94c3f0d Mon Sep 17 00:00:00 2001 From: ganesh Date: Tue, 2 Sep 2025 21:27:08 +0530 Subject: [PATCH 2/2] Fix typos --- README.md | 4 ++-- cmd/init.go | 6 ++---- internal/core/tasks.go | 2 +- internal/resultbackends/sqldb/sqldb.go | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 2aa9431..cc06168 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Once the reports are generated (SQL queries finish executing), it's natural for ## Concepts #### Task -A task is a named SQL query that is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list, unless a specific database is specified in the API request (`db`). A `-- queue:` tag to always route the task to a particular queue, unless it's overriden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one. +A task is a named SQL query that is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list, unless a specific database is specified in the API request (`db`). A `-- queue:` tag to always route the task to a particular queue, unless it's overridden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one. Example: ```sql @@ -51,7 +51,7 @@ SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?; SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?; ``` -Here, when the server starts, the queries `get_profit_summary` and `get_profit_entries` are registered automatically as tasks. Internally, the server validates and prepares these SQL statements (unless `raw: 1`). `?` are MySQL value placholders. For Postgres, the placeholders are `$1, $2 ...` +Here, when the server starts, the queries `get_profit_summary` and `get_profit_entries` are registered automatically as tasks. Internally, the server validates and prepares these SQL statements (unless `raw: 1`). `?` are MySQL value placeholders. For Postgres, the placeholders are `$1, $2 ...` #### Job A job is an instance of a task that has been queued to run. Each job has an ID that can be used to track its status. If an ID is not passed explicitly, it is generated internally and returned. These IDs need not be unique, but only a single job with a certain ID can run at any given point. For the next job with the same ID to be scheduled, the previous job has to finish execution. Using non-unique IDs like this is useful in cases where users can be prevented from sending multiple requests for the same reports, like in our usecases. diff --git a/cmd/init.go b/cmd/init.go index 0a27dcb..bc206bc 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -45,10 +45,8 @@ Flags: ` f.Usage = func() { - fmt.Fprint(os.Stderr, usage) - f.SetOutput(os.Stderr) - f.PrintDefaults() - fmt.Fprintln(os.Stderr) + fmt.Print(usage) + fmt.Println(f.FlagUsages()) os.Exit(0) } diff --git a/internal/core/tasks.go b/internal/core/tasks.go index 16c7f68..87b2083 100644 --- a/internal/core/tasks.go +++ b/internal/core/tasks.go @@ -71,7 +71,7 @@ func (co *Core) loadTasks(dir string) (Tasks, error) { // A map of DBs are attached to every query. This can be // DBs tagged specifically to queries in the SQL file, - // or will be the map of all avaliable DBs. During execution + // or will be the map of all available DBs. During execution // one of these DBs will be picked randomly. srcDBs dbpool.Pool diff --git a/internal/resultbackends/sqldb/sqldb.go b/internal/resultbackends/sqldb/sqldb.go index 3cb7266..d3f8caf 100644 --- a/internal/resultbackends/sqldb/sqldb.go +++ b/internal/resultbackends/sqldb/sqldb.go @@ -36,7 +36,7 @@ type SqlDB struct { // The result schemas (CREATE TABLE ...) are dynamically // generated everytime queries are executed based on their result columns. - // They're cached here so as to avoid repetetive generation. + // They're cached here so as to avoid repetitive generation. resTableSchemas map[string]insertSchema schemaMutex sync.RWMutex }