Skip to content

sql: add schema descriptions as table and column comments #579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions fdw.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,5 +653,39 @@ func goFdwValidate(coid C.Oid, opts *C.List) {
// or a required option is missing.
}

//export goFdwGetForeignTableComments
func goFdwGetForeignTableComments(servername, schemaname, tablename *C.char) *C.List {
remoteSchema := C.GoString(servername)
localSchema := C.GoString(schemaname)
tableName := C.GoString(tablename)
log.Printf("[TRACE] goFdwGetForeignTableComments, serverName: %s, localSchema: %s, tableName: %s", remoteSchema, localSchema, tableName)

// get the plugin hub,
pluginHub := hub.GetHub()

var sql *C.List

// special handling for the command schema
if remoteSchema == constants.InternalSchema {
log.Printf("[INFO] getting comments for setting tables into %s", remoteSchema)
settingsSchema := pluginHub.GetSettingsSchema()
sql = SchemaToCommentsSql(localSchema, tableName, settingsSchema[tableName])
} else if remoteSchema == constants.LegacyCommandSchema {
log.Printf("[INFO] getting comments for setting tables into %s", remoteSchema)
settingsSchema := pluginHub.GetLegacySettingsSchema()
sql = SchemaToCommentsSql(localSchema, tableName, settingsSchema[tableName])
} else {
schema, err := pluginHub.GetSchema(remoteSchema, localSchema)
if err != nil {
log.Printf("[WARN] goFdwGetForeignTableComments failed: %s", err)
FdwError(err)
return nil
}
sql = SchemaToCommentsSql(localSchema, tableName, schema.Schema[tableName])
}

return sql
}

// required by buildmode=c-archive
func main() {}
3 changes: 2 additions & 1 deletion fdw/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "nodes/bitmapset.h"
#include "nodes/makefuncs.h"
#include "nodes/pg_list.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/inet.h"
#include "utils/jsonb.h"
Expand Down Expand Up @@ -133,4 +134,4 @@ List *deserializeDeparsedSortGroup(List *items);
OpExpr *canonicalOpExpr(OpExpr *opExpr, Relids base_relids);
ScalarArrayOpExpr *canonicalScalarArrayOpExpr(ScalarArrayOpExpr *opExpr, Relids base_relids);
char *getOperatorString(Oid opoid);
#endif // FDW_COMMON_H
#endif // FDW_COMMON_H
81 changes: 81 additions & 0 deletions fdw/fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@ static void exitHook(int code, Datum arg);

void *serializePlanState(FdwPlanState *state);
FdwExecState *initializeExecState(void *internalstate);

static void
steampipe_fdw_ProcessUtility(PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc);

// Required by postgres, doing basic checks to ensure compatibility,
// such as being compiled against the correct major version.
PG_MODULE_MAGIC;

static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;

// Define the handler function for signal 16
void signal_handler(int sig) {
// elog(NOTICE, "Caught signal %d", sig);
Expand Down Expand Up @@ -63,6 +76,9 @@ void _PG_init(void)
on_proc_exit(&exitHook, PointerGetDatum(NULL));
RegisterXactCallback(pgfdw_xact_callback, NULL);

/* Hook ProcessUtility for adding comments to foreign tables */
next_ProcessUtility_hook = ProcessUtility_hook;
ProcessUtility_hook = steampipe_fdw_ProcessUtility;
}

/*
Expand All @@ -88,6 +104,71 @@ void exitHook(int code, Datum arg)
goFdwShutdown();
}

static void
steampipe_fdw_ProcessUtility(PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc) {
List *cmd_list = NULL;
if (IsA(pstmt->utilityStmt, CreateForeignTableStmt) && context == PROCESS_UTILITY_SUBCOMMAND && dest == None_Receiver) {
const CreateForeignTableStmt *cstmt = (const CreateForeignTableStmt *)pstmt->utilityStmt;
ForeignServer *server = GetForeignServerByName(cstmt->servername, true);
if (server != NULL) {
ForeignDataWrapper *wrapper = GetForeignDataWrapper(server->fdwid);
if (wrapper != NULL && wrapper->fdwname != NULL && strcmp(wrapper->fdwname, STEAMPIPE_DATAWRAPPER_NAME) == 0) {
cmd_list = goFdwGetForeignTableComments(cstmt->servername, cstmt->base.relation->schemaname, cstmt->base.relation->relname);
}
}
}

if (next_ProcessUtility_hook) {
next_ProcessUtility_hook(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc);
} else {
standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc);
}

if (cmd_list != NULL) {
ListCell *lc;
foreach(lc, cmd_list) {
char *cmd = (char *)lfirst(lc);
List *raw_parsetree_list = pg_parse_query(cmd);

ListCell *lc2;
foreach(lc2, raw_parsetree_list) {
RawStmt *rs = lfirst_node(RawStmt, lc2);
CommentStmt *comment_stmt = (CommentStmt *)rs->stmt;

if (!IsA(comment_stmt, CommentStmt)) {
elog(ERROR, "unexpected statement type %d where CommentStmt expected", (int)nodeTag(comment_stmt));
}

#if PG_VERSION_NUM < 120000
// Be sure to advance the command counter between subcommands
// Not needed in PG 12+ because standard_ProcessUtility already does this
CommandCounterIncrement();
#endif

pstmt = makeNode(PlannedStmt);
pstmt->commandType = CMD_UTILITY;
pstmt->canSetTag = false;
pstmt->utilityStmt = (Node *) comment_stmt;
pstmt->stmt_location = rs->stmt_location;
pstmt->stmt_len = rs->stmt_len;

/* Execute statement */
ProcessUtility(pstmt, cmd, false,
PROCESS_UTILITY_SUBCOMMAND, NULL, NULL,
None_Receiver, NULL);
}
}

}
}

static bool fdwIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) {
return getenv("STEAMPIPE_FDW_PARALLEL_SAFE") != NULL;
}
Expand Down
4 changes: 3 additions & 1 deletion fdw/fdw_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// defined and available.
#include "fmgr.h"

#define STEAMPIPE_DATAWRAPPER_NAME "steampipe_postgres_fdw"

static bool fdwIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte);
static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);
static void fdwGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);
Expand Down Expand Up @@ -44,4 +46,4 @@ Datum fdw_validator(PG_FUNCTION_ARGS) {
List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
goFdwValidate(catalog, options_list);
PG_RETURN_VOID();
}
}
22 changes: 22 additions & 0 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,25 @@ func SchemaToSql(schema map[string]*proto.TableSchema, stmt *C.ImportForeignSche

return commands
}

func SchemaToCommentsSql(localSchema, table string, tableSchema *proto.TableSchema) *C.List {
if tableSchema == nil {
return nil
}

log.Printf("[TRACE] Getting comments for table %s", table)

comments, err := sql.GetCommentsForTable(table, tableSchema, localSchema)
if err != nil {
FdwError(err)
return nil
}

var commands *C.List
for _, c := range comments {
log.Printf("[TRACE] SQL %s", c)
commands = C.lappend(commands, unsafe.Pointer(C.CString(c)))
}

return commands
}
21 changes: 21 additions & 0 deletions sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,27 @@ SERVER %s OPTIONS (table %s)`,
return sql, nil
}

func GetCommentsForTable(table string, tableSchema *proto.TableSchema, localSchema string) ([]string, error) {
localSchema = db_common.PgEscapeName(localSchema)
table = db_common.PgEscapeName(table)

var commentStatements []string
if tableSchema.Description != "" {
tableDescription := db_common.PgEscapeString(tableSchema.Description)
commentStatements = append(commentStatements, fmt.Sprintf("COMMENT ON FOREIGN TABLE %s.%s IS %s", localSchema, table, tableDescription))
}

for _, c := range tableSchema.Columns {
if c.Description != "" {
column := db_common.PgEscapeName(c.Name)
columnDescription := db_common.PgEscapeString(c.Description)
commentStatements = append(commentStatements, fmt.Sprintf("COMMENT ON COLUMN %s.%s.%s IS %s", localSchema, table, column, columnDescription))
}
}

return commentStatements, nil
}

func sqlTypeForColumnType(columnType proto.ColumnType) (string, error) {
switch columnType {
case proto.ColumnType_BOOL:
Expand Down
4 changes: 3 additions & 1 deletion templates/fdw/fdw_handlers.h.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// defined and available.
#include "fmgr.h"

#define STEAMPIPE_DATAWRAPPER_NAME "steampipe_postgres_{{.Plugin}}"

static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);
static void fdwGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);
static ForeignScan *fdwGetForeignPlan(
Expand Down Expand Up @@ -42,4 +44,4 @@ Datum steampipe_{{.Plugin}}_fdw_validator(PG_FUNCTION_ARGS) {
List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
goFdwValidate(catalog, options_list);
PG_RETURN_VOID();
}
}