diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index d388cb51bfd7..26693ed3f717 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -167,6 +167,11 @@ public Catalog paimonCatalog() { return catalog; } + @Override + public String paimonCatalogName() { + return catalogName; + } + // ======================= database methods =============================== @Override @@ -177,7 +182,7 @@ public String[] defaultNamespace() { @Override public void createNamespace(String[] namespace, Map metadata) throws NamespaceAlreadyExistsException { - checkNamespace(namespace); + checkNamespace(namespace, catalogName); try { String databaseName = getDatabaseNameFromNamespace(namespace); catalog.createDatabase(databaseName, false, metadata); @@ -201,7 +206,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep if (namespace.length == 0) { return listNamespaces(); } - checkNamespace(namespace); + checkNamespace(namespace, catalogName); try { String databaseName = getDatabaseNameFromNamespace(namespace); catalog.getDatabase(databaseName); @@ -214,7 +219,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep @Override public Map loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException { - checkNamespace(namespace); + checkNamespace(namespace, catalogName); try { String databaseName = getDatabaseNameFromNamespace(namespace); return catalog.getDatabase(databaseName).options(); @@ -252,7 +257,7 @@ public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException */ public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException { - checkNamespace(namespace); + checkNamespace(namespace, catalogName); try { String databaseName = getDatabaseNameFromNamespace(namespace); catalog.dropDatabase(databaseName, false, cascade); @@ -268,7 +273,7 @@ public boolean dropNamespace(String[] namespace, boolean cascade) @Override public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException { - checkNamespace(namespace); + checkNamespace(namespace, catalogName); try { String databaseName = getDatabaseNameFromNamespace(namespace); List propertyChanges = @@ -283,7 +288,7 @@ public void alterNamespace(String[] namespace, NamespaceChange... changes) @Override public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { - checkNamespace(namespace); + checkNamespace(namespace, catalogName); try { String databaseName = getDatabaseNameFromNamespace(namespace); return catalog.listTables(databaseName).stream() @@ -296,7 +301,7 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public void invalidateTable(Identifier ident) { - catalog.invalidateTable(toIdentifier(ident)); + catalog.invalidateTable(toIdentifier(ident, catalogName)); } @Override @@ -349,7 +354,7 @@ public org.apache.spark.sql.connector.catalog.Table alterTable( List schemaChanges = Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList()); try { - catalog.alterTable(toIdentifier(ident), schemaChanges, false); + catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false); return loadTable(ident); } catch (Catalog.TableNotExistException e) { throw new NoSuchTableException(ident); @@ -367,7 +372,9 @@ public org.apache.spark.sql.connector.catalog.Table createTable( throws TableAlreadyExistsException, NoSuchNamespaceException { try { catalog.createTable( - toIdentifier(ident), toInitialSchema(schema, partitions, properties), false); + toIdentifier(ident, catalogName), + toInitialSchema(schema, partitions, properties), + false); return loadTable(ident); } catch (Catalog.TableAlreadyExistException e) { throw new TableAlreadyExistsException(ident); @@ -381,7 +388,7 @@ public org.apache.spark.sql.connector.catalog.Table createTable( @Override public boolean dropTable(Identifier ident) { try { - catalog.dropTable(toIdentifier(ident), false); + catalog.dropTable(toIdentifier(ident, catalogName), false); return true; } catch (Catalog.TableNotExistException e) { return false; @@ -524,8 +531,8 @@ public void renameTable(Identifier oldIdent, Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { try { catalog.renameTable( - toIdentifier(oldIdent), - toIdentifier(removeCatalogName(newIdent, catalogName)), + toIdentifier(oldIdent, catalogName), + toIdentifier(removeCatalogName(newIdent, catalogName), catalogName), false); } catch (Catalog.TableNotExistException e) { throw new NoSuchTableException(oldIdent); @@ -566,7 +573,7 @@ public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExcep } } else if (isDatabaseFunctionNamespace(namespace)) { try { - Function paimonFunction = catalog.getFunction(toIdentifier(ident)); + Function paimonFunction = catalog.getFunction(toIdentifier(ident, catalogName)); FunctionDefinition functionDefinition = paimonFunction.definition(FUNCTION_DEFINITION_NAME); if (functionDefinition instanceof FunctionDefinition.LambdaFunctionDefinition) { @@ -654,13 +661,17 @@ public void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throw protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( Identifier ident, Map extraOptions) throws NoSuchTableException { try { - org.apache.paimon.table.Table paimonTable = catalog.getTable(toIdentifier(ident)); + org.apache.paimon.table.Table paimonTable = + catalog.getTable(toIdentifier(ident, catalogName)); if (paimonTable instanceof FormatTable) { return toSparkFormatTable(ident, (FormatTable) paimonTable); } else { return new SparkTable( copyWithSQLConf( - paimonTable, catalogName, toIdentifier(ident), extraOptions)); + paimonTable, + catalogName, + toIdentifier(ident, catalogName), + extraOptions)); } } catch (Catalog.TableNotExistException e) { throw new NoSuchTableException(ident); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 098b73a50b9b..e79af9a0b488 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -93,6 +93,10 @@ public Catalog paimonCatalog() { return this.sparkCatalog.paimonCatalog(); } + @Override + public String paimonCatalogName() { + return catalogName; + } // ======================= database methods =============================== @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java index 567b08569148..3408b1fb4500 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java @@ -41,7 +41,7 @@ public interface SupportView extends WithPaimonCatalog { default List listViews(String[] namespace) throws NoSuchNamespaceException { try { - checkNamespace(namespace); + checkNamespace(namespace, paimonCatalogName()); return paimonCatalog().listViews(namespace[0]); } catch (Catalog.DatabaseNotExistException e) { throw new NoSuchNamespaceException(namespace); @@ -49,7 +49,7 @@ default List listViews(String[] namespace) throws NoSuchNamespaceExcepti } default View loadView(Identifier ident) throws Catalog.ViewNotExistException { - return paimonCatalog().getView(toIdentifier(ident)); + return paimonCatalog().getView(toIdentifier(ident, paimonCatalogName())); } default void createView( @@ -60,7 +60,7 @@ default void createView( Map properties, Boolean ignoreIfExists) throws NoSuchNamespaceException { - org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident); + org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident, paimonCatalogName()); try { paimonCatalog() .createView( @@ -82,7 +82,7 @@ default void createView( default void dropView(Identifier ident, Boolean ignoreIfExists) { try { - paimonCatalog().dropView(toIdentifier(ident), ignoreIfExists); + paimonCatalog().dropView(toIdentifier(ident, paimonCatalogName()), ignoreIfExists); } catch (Catalog.ViewNotExistException e) { throw new RuntimeException("view not exists: " + ident, e); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/WithPaimonCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/WithPaimonCatalog.java index 6c227ce1bd42..63a84cbdd44d 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/WithPaimonCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/WithPaimonCatalog.java @@ -23,4 +23,6 @@ /** With paimon catalog. */ public interface WithPaimonCatalog { Catalog paimonCatalog(); + + String paimonCatalogName(); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java index daf2a1676f59..137fa8120b8c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java @@ -77,9 +77,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); + String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName(); org.apache.spark.sql.connector.catalog.Identifier ident = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Identifier function = CatalogUtils.toIdentifier(ident); + Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName); FunctionChange functionChange = JsonSerdeUtil.fromJson(args.getString(1), FunctionChange.class); try { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java index 029ecb29e63f..a924c5df018b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterViewDialectProcedure.java @@ -91,9 +91,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); + String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName(); org.apache.spark.sql.connector.catalog.Identifier ident = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Identifier view = CatalogUtils.toIdentifier(ident); + Identifier view = CatalogUtils.toIdentifier(ident, paimonCatalogName); ViewChange viewChange; String dialect = ((GenericInternalRow) args).genericGet(2) == null diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java index a2c47739eed1..bfee9693fb9d 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java @@ -90,9 +90,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); + String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName(); org.apache.spark.sql.connector.catalog.Identifier ident = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Identifier function = CatalogUtils.toIdentifier(ident); + Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName); List inputParams = getDataFieldsFromArguments(1, args); List returnParams = getDataFieldsFromArguments(2, args); boolean deterministic = args.isNullAt(3) ? true : args.getBoolean(3); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java index 2a63f6eff6b0..62dcfc18a4b5 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java @@ -70,9 +70,10 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); + String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName(); org.apache.spark.sql.connector.catalog.Identifier ident = toIdentifier(args.getString(0), PARAMETERS[0].name()); - Identifier function = CatalogUtils.toIdentifier(ident); + Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName); try { paimonCatalog.dropFunction(function, false); } catch (Exception e) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java index 25882f5e86a1..dece479251a8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java @@ -60,15 +60,18 @@ /** Utils of catalog. */ public class CatalogUtils { - public static void checkNamespace(String[] namespace) { + public static void checkNamespace(String[] namespace, String catalogName) { checkArgument( namespace.length == 1, - "Paimon only support single namespace, but got %s", + "Current catalog is %s, catalog %s does not exist or Paimon only support single namespace, but got %s", + catalogName, + namespace.length > 0 ? namespace[0] : "unknown", Arrays.toString(namespace)); } - public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) { - checkNamespace(ident.namespace()); + public static org.apache.paimon.catalog.Identifier toIdentifier( + Identifier ident, String catalogName) { + checkNamespace(ident.namespace(), catalogName); return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name()); } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala index 8170c5836442..1e276866c6d7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala @@ -529,6 +529,18 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { assert(withColStat == noColStat) } + test("Query a non-existent catalog") { + assert(intercept[Exception] { + sql("SELECT * FROM paimon1.default.t") + }.getMessage.contains("Current catalog is paimon, catalog paimon1 does not exist")) + } + + test("Query a table with multiple namespaces") { + assert(intercept[Exception] { + sql("SELECT * FROM paimon.x.default.t") + }.getMessage.contains("Paimon only support single namespace")) + } + protected def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = { fileIO.listStatus(new Path(tableLocation, "statistics")).length }