From ea66ac248f7da3254f1f3996089b76ea628cf3ae Mon Sep 17 00:00:00 2001 From: Isaac Garzon Date: Fri, 25 Jul 2025 12:03:49 +0300 Subject: [PATCH] sql: add schema descriptions as table and column comments The steampipe CLI does this, and it's very helpful when trying to figure out what a specific table or column holds. --- fdw.go | 34 +++++++++++++ fdw/common.h | 3 +- fdw/fdw.c | 81 +++++++++++++++++++++++++++++++ fdw/fdw_handlers.h | 4 +- schema.go | 22 +++++++++ sql/sql.go | 21 ++++++++ templates/fdw/fdw_handlers.h.tmpl | 4 +- 7 files changed, 166 insertions(+), 3 deletions(-) diff --git a/fdw.go b/fdw.go index 5626ed58..f76790c0 100644 --- a/fdw.go +++ b/fdw.go @@ -653,5 +653,39 @@ func goFdwValidate(coid C.Oid, opts *C.List) { // or a required option is missing. } +//export goFdwGetForeignTableComments +func goFdwGetForeignTableComments(servername, schemaname, tablename *C.char) *C.List { + remoteSchema := C.GoString(servername) + localSchema := C.GoString(schemaname) + tableName := C.GoString(tablename) + log.Printf("[TRACE] goFdwGetForeignTableComments, serverName: %s, localSchema: %s, tableName: %s", remoteSchema, localSchema, tableName) + + // get the plugin hub, + pluginHub := hub.GetHub() + + var sql *C.List + + // special handling for the command schema + if remoteSchema == constants.InternalSchema { + log.Printf("[INFO] getting comments for setting tables into %s", remoteSchema) + settingsSchema := pluginHub.GetSettingsSchema() + sql = SchemaToCommentsSql(localSchema, tableName, settingsSchema[tableName]) + } else if remoteSchema == constants.LegacyCommandSchema { + log.Printf("[INFO] getting comments for setting tables into %s", remoteSchema) + settingsSchema := pluginHub.GetLegacySettingsSchema() + sql = SchemaToCommentsSql(localSchema, tableName, settingsSchema[tableName]) + } else { + schema, err := pluginHub.GetSchema(remoteSchema, localSchema) + if err != nil { + log.Printf("[WARN] goFdwGetForeignTableComments failed: %s", err) + FdwError(err) + return nil + } + sql = SchemaToCommentsSql(localSchema, tableName, schema.Schema[tableName]) + } + + return sql +} + // required by buildmode=c-archive func main() {} diff --git a/fdw/common.h b/fdw/common.h index ebf9e670..c27eb379 100644 --- a/fdw/common.h +++ b/fdw/common.h @@ -13,6 +13,7 @@ #include "nodes/bitmapset.h" #include "nodes/makefuncs.h" #include "nodes/pg_list.h" +#include "tcop/utility.h" #include "utils/builtins.h" #include "utils/inet.h" #include "utils/jsonb.h" @@ -133,4 +134,4 @@ List *deserializeDeparsedSortGroup(List *items); OpExpr *canonicalOpExpr(OpExpr *opExpr, Relids base_relids); ScalarArrayOpExpr *canonicalScalarArrayOpExpr(ScalarArrayOpExpr *opExpr, Relids base_relids); char *getOperatorString(Oid opoid); -#endif // FDW_COMMON_H \ No newline at end of file +#endif // FDW_COMMON_H diff --git a/fdw/fdw.c b/fdw/fdw.c index f620198b..1fcb2d5b 100644 --- a/fdw/fdw.c +++ b/fdw/fdw.c @@ -16,10 +16,23 @@ static void exitHook(int code, Datum arg); void *serializePlanState(FdwPlanState *state); FdwExecState *initializeExecState(void *internalstate); + +static void +steampipe_fdw_ProcessUtility(PlannedStmt *pstmt, + const char *queryString, + bool readOnlyTree, + ProcessUtilityContext context, + ParamListInfo params, + QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletion *qc); + // Required by postgres, doing basic checks to ensure compatibility, // such as being compiled against the correct major version. PG_MODULE_MAGIC; +static ProcessUtility_hook_type next_ProcessUtility_hook = NULL; + // Define the handler function for signal 16 void signal_handler(int sig) { // elog(NOTICE, "Caught signal %d", sig); @@ -63,6 +76,9 @@ void _PG_init(void) on_proc_exit(&exitHook, PointerGetDatum(NULL)); RegisterXactCallback(pgfdw_xact_callback, NULL); + /* Hook ProcessUtility for adding comments to foreign tables */ + next_ProcessUtility_hook = ProcessUtility_hook; + ProcessUtility_hook = steampipe_fdw_ProcessUtility; } /* @@ -88,6 +104,71 @@ void exitHook(int code, Datum arg) goFdwShutdown(); } +static void +steampipe_fdw_ProcessUtility(PlannedStmt *pstmt, + const char *queryString, + bool readOnlyTree, + ProcessUtilityContext context, + ParamListInfo params, + QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletion *qc) { + List *cmd_list = NULL; + if (IsA(pstmt->utilityStmt, CreateForeignTableStmt) && context == PROCESS_UTILITY_SUBCOMMAND && dest == None_Receiver) { + const CreateForeignTableStmt *cstmt = (const CreateForeignTableStmt *)pstmt->utilityStmt; + ForeignServer *server = GetForeignServerByName(cstmt->servername, true); + if (server != NULL) { + ForeignDataWrapper *wrapper = GetForeignDataWrapper(server->fdwid); + if (wrapper != NULL && wrapper->fdwname != NULL && strcmp(wrapper->fdwname, STEAMPIPE_DATAWRAPPER_NAME) == 0) { + cmd_list = goFdwGetForeignTableComments(cstmt->servername, cstmt->base.relation->schemaname, cstmt->base.relation->relname); + } + } + } + + if (next_ProcessUtility_hook) { + next_ProcessUtility_hook(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc); + } else { + standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc); + } + + if (cmd_list != NULL) { + ListCell *lc; + foreach(lc, cmd_list) { + char *cmd = (char *)lfirst(lc); + List *raw_parsetree_list = pg_parse_query(cmd); + + ListCell *lc2; + foreach(lc2, raw_parsetree_list) { + RawStmt *rs = lfirst_node(RawStmt, lc2); + CommentStmt *comment_stmt = (CommentStmt *)rs->stmt; + + if (!IsA(comment_stmt, CommentStmt)) { + elog(ERROR, "unexpected statement type %d where CommentStmt expected", (int)nodeTag(comment_stmt)); + } + +#if PG_VERSION_NUM < 120000 + // Be sure to advance the command counter between subcommands + // Not needed in PG 12+ because standard_ProcessUtility already does this + CommandCounterIncrement(); +#endif + + pstmt = makeNode(PlannedStmt); + pstmt->commandType = CMD_UTILITY; + pstmt->canSetTag = false; + pstmt->utilityStmt = (Node *) comment_stmt; + pstmt->stmt_location = rs->stmt_location; + pstmt->stmt_len = rs->stmt_len; + + /* Execute statement */ + ProcessUtility(pstmt, cmd, false, + PROCESS_UTILITY_SUBCOMMAND, NULL, NULL, + None_Receiver, NULL); + } + } + + } +} + static bool fdwIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) { return getenv("STEAMPIPE_FDW_PARALLEL_SAFE") != NULL; } diff --git a/fdw/fdw_handlers.h b/fdw/fdw_handlers.h index a282bdea..745b6374 100644 --- a/fdw/fdw_handlers.h +++ b/fdw/fdw_handlers.h @@ -2,6 +2,8 @@ // defined and available. #include "fmgr.h" +#define STEAMPIPE_DATAWRAPPER_NAME "steampipe_postgres_fdw" + static bool fdwIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static void fdwGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); @@ -44,4 +46,4 @@ Datum fdw_validator(PG_FUNCTION_ARGS) { List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); goFdwValidate(catalog, options_list); PG_RETURN_VOID(); -} \ No newline at end of file +} diff --git a/schema.go b/schema.go index c3f00dc5..a9b29d85 100644 --- a/schema.go +++ b/schema.go @@ -71,3 +71,25 @@ func SchemaToSql(schema map[string]*proto.TableSchema, stmt *C.ImportForeignSche return commands } + +func SchemaToCommentsSql(localSchema, table string, tableSchema *proto.TableSchema) *C.List { + if tableSchema == nil { + return nil + } + + log.Printf("[TRACE] Getting comments for table %s", table) + + comments, err := sql.GetCommentsForTable(table, tableSchema, localSchema) + if err != nil { + FdwError(err) + return nil + } + + var commands *C.List + for _, c := range comments { + log.Printf("[TRACE] SQL %s", c) + commands = C.lappend(commands, unsafe.Pointer(C.CString(c))) + } + + return commands +} diff --git a/sql/sql.go b/sql/sql.go index 38b0d722..b80bc415 100644 --- a/sql/sql.go +++ b/sql/sql.go @@ -45,6 +45,27 @@ SERVER %s OPTIONS (table %s)`, return sql, nil } +func GetCommentsForTable(table string, tableSchema *proto.TableSchema, localSchema string) ([]string, error) { + localSchema = db_common.PgEscapeName(localSchema) + table = db_common.PgEscapeName(table) + + var commentStatements []string + if tableSchema.Description != "" { + tableDescription := db_common.PgEscapeString(tableSchema.Description) + commentStatements = append(commentStatements, fmt.Sprintf("COMMENT ON FOREIGN TABLE %s.%s IS %s", localSchema, table, tableDescription)) + } + + for _, c := range tableSchema.Columns { + if c.Description != "" { + column := db_common.PgEscapeName(c.Name) + columnDescription := db_common.PgEscapeString(c.Description) + commentStatements = append(commentStatements, fmt.Sprintf("COMMENT ON COLUMN %s.%s.%s IS %s", localSchema, table, column, columnDescription)) + } + } + + return commentStatements, nil +} + func sqlTypeForColumnType(columnType proto.ColumnType) (string, error) { switch columnType { case proto.ColumnType_BOOL: diff --git a/templates/fdw/fdw_handlers.h.tmpl b/templates/fdw/fdw_handlers.h.tmpl index d1e735ea..5b5366b9 100644 --- a/templates/fdw/fdw_handlers.h.tmpl +++ b/templates/fdw/fdw_handlers.h.tmpl @@ -2,6 +2,8 @@ // defined and available. #include "fmgr.h" +#define STEAMPIPE_DATAWRAPPER_NAME "steampipe_postgres_{{.Plugin}}" + static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static void fdwGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid); static ForeignScan *fdwGetForeignPlan( @@ -42,4 +44,4 @@ Datum steampipe_{{.Plugin}}_fdw_validator(PG_FUNCTION_ARGS) { List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); goFdwValidate(catalog, options_list); PG_RETURN_VOID(); -} \ No newline at end of file +}