Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public Catalog paimonCatalog() {
return catalog;
}

@Override
public String paimonCatalogName() {
return catalogName;
}

// ======================= database methods ===============================

@Override
Expand All @@ -177,7 +182,7 @@ public String[] defaultNamespace() {
@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
checkNamespace(namespace);
checkNamespace(namespace, catalogName);
try {
String databaseName = getDatabaseNameFromNamespace(namespace);
catalog.createDatabase(databaseName, false, metadata);
Expand All @@ -201,7 +206,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep
if (namespace.length == 0) {
return listNamespaces();
}
checkNamespace(namespace);
checkNamespace(namespace, catalogName);
try {
String databaseName = getDatabaseNameFromNamespace(namespace);
catalog.getDatabase(databaseName);
Expand All @@ -214,7 +219,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
checkNamespace(namespace);
checkNamespace(namespace, catalogName);
try {
String databaseName = getDatabaseNameFromNamespace(namespace);
return catalog.getDatabase(databaseName).options();
Expand Down Expand Up @@ -252,7 +257,7 @@ public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException
*/
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
checkNamespace(namespace);
checkNamespace(namespace, catalogName);
try {
String databaseName = getDatabaseNameFromNamespace(namespace);
catalog.dropDatabase(databaseName, false, cascade);
Expand All @@ -268,7 +273,7 @@ public boolean dropNamespace(String[] namespace, boolean cascade)
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
checkNamespace(namespace);
checkNamespace(namespace, catalogName);
try {
String databaseName = getDatabaseNameFromNamespace(namespace);
List<PropertyChange> propertyChanges =
Expand All @@ -283,7 +288,7 @@ public void alterNamespace(String[] namespace, NamespaceChange... changes)

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
checkNamespace(namespace);
checkNamespace(namespace, catalogName);
try {
String databaseName = getDatabaseNameFromNamespace(namespace);
return catalog.listTables(databaseName).stream()
Expand All @@ -296,7 +301,7 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti

@Override
public void invalidateTable(Identifier ident) {
catalog.invalidateTable(toIdentifier(ident));
catalog.invalidateTable(toIdentifier(ident, catalogName));
}

@Override
Expand Down Expand Up @@ -349,7 +354,7 @@ public org.apache.spark.sql.connector.catalog.Table alterTable(
List<SchemaChange> schemaChanges =
Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
try {
catalog.alterTable(toIdentifier(ident), schemaChanges, false);
catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false);
return loadTable(ident);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
Expand All @@ -367,7 +372,9 @@ public org.apache.spark.sql.connector.catalog.Table createTable(
throws TableAlreadyExistsException, NoSuchNamespaceException {
try {
catalog.createTable(
toIdentifier(ident), toInitialSchema(schema, partitions, properties), false);
toIdentifier(ident, catalogName),
toInitialSchema(schema, partitions, properties),
false);
return loadTable(ident);
} catch (Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistsException(ident);
Expand All @@ -381,7 +388,7 @@ public org.apache.spark.sql.connector.catalog.Table createTable(
@Override
public boolean dropTable(Identifier ident) {
try {
catalog.dropTable(toIdentifier(ident), false);
catalog.dropTable(toIdentifier(ident, catalogName), false);
return true;
} catch (Catalog.TableNotExistException e) {
return false;
Expand Down Expand Up @@ -524,8 +531,8 @@ public void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException {
try {
catalog.renameTable(
toIdentifier(oldIdent),
toIdentifier(removeCatalogName(newIdent, catalogName)),
toIdentifier(oldIdent, catalogName),
toIdentifier(removeCatalogName(newIdent, catalogName), catalogName),
false);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(oldIdent);
Expand Down Expand Up @@ -566,7 +573,7 @@ public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExcep
}
} else if (isDatabaseFunctionNamespace(namespace)) {
try {
Function paimonFunction = catalog.getFunction(toIdentifier(ident));
Function paimonFunction = catalog.getFunction(toIdentifier(ident, catalogName));
FunctionDefinition functionDefinition =
paimonFunction.definition(FUNCTION_DEFINITION_NAME);
if (functionDefinition instanceof FunctionDefinition.LambdaFunctionDefinition) {
Expand Down Expand Up @@ -654,13 +661,17 @@ public void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throw
protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException {
try {
org.apache.paimon.table.Table paimonTable = catalog.getTable(toIdentifier(ident));
org.apache.paimon.table.Table paimonTable =
catalog.getTable(toIdentifier(ident, catalogName));
if (paimonTable instanceof FormatTable) {
return toSparkFormatTable(ident, (FormatTable) paimonTable);
} else {
return new SparkTable(
copyWithSQLConf(
paimonTable, catalogName, toIdentifier(ident), extraOptions));
paimonTable,
catalogName,
toIdentifier(ident, catalogName),
extraOptions));
}
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public Catalog paimonCatalog() {
return this.sparkCatalog.paimonCatalog();
}

@Override
public String paimonCatalogName() {
return catalogName;
}
// ======================= database methods ===============================

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ public interface SupportView extends WithPaimonCatalog {

default List<String> listViews(String[] namespace) throws NoSuchNamespaceException {
try {
checkNamespace(namespace);
checkNamespace(namespace, paimonCatalogName());
return paimonCatalog().listViews(namespace[0]);
} catch (Catalog.DatabaseNotExistException e) {
throw new NoSuchNamespaceException(namespace);
}
}

default View loadView(Identifier ident) throws Catalog.ViewNotExistException {
return paimonCatalog().getView(toIdentifier(ident));
return paimonCatalog().getView(toIdentifier(ident, paimonCatalogName()));
}

default void createView(
Expand All @@ -60,7 +60,7 @@ default void createView(
Map<String, String> properties,
Boolean ignoreIfExists)
throws NoSuchNamespaceException {
org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident);
org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident, paimonCatalogName());
try {
paimonCatalog()
.createView(
Expand All @@ -82,7 +82,7 @@ default void createView(

default void dropView(Identifier ident, Boolean ignoreIfExists) {
try {
paimonCatalog().dropView(toIdentifier(ident), ignoreIfExists);
paimonCatalog().dropView(toIdentifier(ident, paimonCatalogName()), ignoreIfExists);
} catch (Catalog.ViewNotExistException e) {
throw new RuntimeException("view not exists: " + ident, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@
/** With paimon catalog. */
public interface WithPaimonCatalog {
Catalog paimonCatalog();

String paimonCatalogName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
org.apache.spark.sql.connector.catalog.Identifier ident =
toIdentifier(args.getString(0), PARAMETERS[0].name());
Identifier function = CatalogUtils.toIdentifier(ident);
Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName);
FunctionChange functionChange =
JsonSerdeUtil.fromJson(args.getString(1), FunctionChange.class);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
org.apache.spark.sql.connector.catalog.Identifier ident =
toIdentifier(args.getString(0), PARAMETERS[0].name());
Identifier view = CatalogUtils.toIdentifier(ident);
Identifier view = CatalogUtils.toIdentifier(ident, paimonCatalogName);
ViewChange viewChange;
String dialect =
((GenericInternalRow) args).genericGet(2) == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
org.apache.spark.sql.connector.catalog.Identifier ident =
toIdentifier(args.getString(0), PARAMETERS[0].name());
Identifier function = CatalogUtils.toIdentifier(ident);
Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName);
List<DataField> inputParams = getDataFieldsFromArguments(1, args);
List<DataField> returnParams = getDataFieldsFromArguments(2, args);
boolean deterministic = args.isNullAt(3) ? true : args.getBoolean(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
String paimonCatalogName = ((WithPaimonCatalog) tableCatalog()).paimonCatalogName();
org.apache.spark.sql.connector.catalog.Identifier ident =
toIdentifier(args.getString(0), PARAMETERS[0].name());
Identifier function = CatalogUtils.toIdentifier(ident);
Identifier function = CatalogUtils.toIdentifier(ident, paimonCatalogName);
try {
paimonCatalog.dropFunction(function, false);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@
/** Utils of catalog. */
public class CatalogUtils {

public static void checkNamespace(String[] namespace) {
public static void checkNamespace(String[] namespace, String catalogName) {
checkArgument(
namespace.length == 1,
"Paimon only support single namespace, but got %s",
"Current catalog is %s, catalog %s does not exist or Paimon only support single namespace, but got %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case for this

catalogName,
namespace.length > 0 ? namespace[0] : "unknown",
Arrays.toString(namespace));
}

public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) {
checkNamespace(ident.namespace());
public static org.apache.paimon.catalog.Identifier toIdentifier(
Identifier ident, String catalogName) {
checkNamespace(ident.namespace(), catalogName);
return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,18 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
assert(withColStat == noColStat)
}

test("Query a non-existent catalog") {
assert(intercept[Exception] {
sql("SELECT * FROM paimon1.default.t")
}.getMessage.contains("Current catalog is paimon, catalog paimon1 does not exist"))
}

test("Query a table with multiple namespaces") {
assert(intercept[Exception] {
sql("SELECT * FROM paimon.x.default.t")
}.getMessage.contains("Paimon only support single namespace"))
}

protected def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = {
fileIO.listStatus(new Path(tableLocation, "statistics")).length
}
Expand Down
Loading