Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ public class AmoroManagementConf {
.defaultValue(1260)
.withDescription("Port that the table service thrift server is bound to.");

public static final ConfigOption<String> TABLE_SERVICE_IMPL =
ConfigOptions.key("table-service.impl")
.stringType()
.defaultValue("default")
.withDescription(
"TableService implementation provider name or FQCN. Default is 'default'.");

public static final ConfigOption<Integer> OPTIMIZING_SERVICE_THRIFT_BIND_PORT =
ConfigOptions.key("thrift-server.optimizing-service.bind-port")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
import org.apache.amoro.server.table.DefaultTableManager;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceLoader;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.server.utils.ThriftServiceProxy;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -167,7 +167,7 @@ public void startRestServices() throws Exception {
}

public void startOptimizingService() throws Exception {
tableService = new DefaultTableService(serviceConfig, catalogManager);
tableService = TableServiceLoader.load(serviceConfig, catalogManager);

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.amoro.server.persistence.TableRuntimeState;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Objects;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -217,7 +216,7 @@ public TableRuntime getRuntime(Long tableId) {
return tableRuntimeMap.get(tableId);
}

@VisibleForTesting
@Override
public void setRuntime(DefaultTableRuntime tableRuntime) {
checkStarted();
tableRuntimeMap.put(tableRuntime.getTableIdentifier().getId(), tableRuntime);
Expand Down Expand Up @@ -245,8 +244,8 @@ public void dispose() {
}
}

@VisibleForTesting
void exploreTableRuntimes() {
@Override
public void exploreTableRuntimes() {
if (!initialized.isDone()) {
throw new IllegalStateException("TableService is not initialized");
}
Expand Down Expand Up @@ -289,7 +288,7 @@ void exploreTableRuntimes() {
LOG.info("Syncing external catalogs took {} ms.", end - start);
}

@VisibleForTesting
@Override
public void exploreExternalCatalog(ExternalCatalog externalCatalog) {
final List<CompletableFuture<Set<TableIdentity>>> tableIdentifiersFutures =
Lists.newArrayList();
Expand Down Expand Up @@ -508,7 +507,7 @@ private void revertTableRuntimeAdded(
}
}

@VisibleForTesting
@Override
public void disposeTable(ServerTableIdentifier tableIdentifier) {
TableRuntime existedTableRuntime = tableRuntimeMap.get(tableIdentifier.getId());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.InternalCatalog;

public interface TableService extends TableRuntimeHandler {
Expand All @@ -46,4 +47,16 @@ default boolean contains(Long tableId) {
* @return managed table.
*/
AmoroTable<?> loadTable(ServerTableIdentifier identifier);

/** Explore and synchronize table runtimes from catalogs. Intended for periodic sync and tests. */
void exploreTableRuntimes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who will call those methods?

Copy link
Contributor Author

@wardlican wardlican Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is mentioned in the interface for compatibility with the original test cases in SPI mode, and secondly, this method also needs to be implemented in master-slave mode. @baiyangtx @zhoujinsong


/** Explore and synchronize a specific external catalog. */
void exploreExternalCatalog(ExternalCatalog externalCatalog);

/** Set or replace a runtime for testing or recovery scenarios. */
void setRuntime(DefaultTableRuntime tableRuntime);

/** Dispose a managed table and its runtime. */
void disposeTable(ServerTableIdentifier tableIdentifier);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.catalog.CatalogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.util.ServiceLoader;

public final class TableServiceLoader {

private static final Logger LOG = LoggerFactory.getLogger(TableServiceLoader.class);

private TableServiceLoader() {}

public static TableService load(Configurations conf, CatalogManager catalogManager) {
String impl = conf.getString(AmoroManagementConf.TABLE_SERVICE_IMPL);

// 1) Try named providers via ServiceLoader
ServiceLoader<TableServiceProvider> loader = ServiceLoader.load(TableServiceProvider.class);
for (TableServiceProvider provider : loader) {
try {
if (provider.name().equalsIgnoreCase(impl)) {
LOG.info("Loading TableService from provider name: {} -> {}", impl, provider.getClass());
return provider.create(conf, catalogManager);
}
} catch (Throwable t) {
LOG.warn("Failed to create TableService from provider {}", provider.getClass(), t);
}
}

// 2) Try FQCN
try {
Class<?> clazz = Class.forName(impl);
if (!TableService.class.isAssignableFrom(clazz)) {
LOG.warn("Configured class {} does not implement TableService, fallback to default.", impl);
} else {
try {
Constructor<?> constructor =
clazz.getConstructor(Configurations.class, CatalogManager.class);
LOG.info("Loading TableService from class: {}", impl);
return (TableService) constructor.newInstance(conf, catalogManager);
} catch (NoSuchMethodException nsme) {
LOG.warn(
"No (Configurations, CatalogManager) constructor for {}, fallback to default.", impl);
}
}
} catch (ClassNotFoundException cnfe) {
LOG.info("Configured TableService impl not found as class: {}. Will fallback.", impl);
} catch (Throwable t) {
LOG.warn("Failed to instantiate TableService impl: {}. Will fallback.", impl, t);
}

// 3) Fallback to 'default' provider
for (TableServiceProvider provider : loader) {
if ("default".equalsIgnoreCase(provider.name())) {
LOG.info("Falling back to default TableService provider: {}", provider.getClass());
return provider.create(conf, catalogManager);
}
}

// 4) Last resort: try DefaultTableService directly (avoid circular deps by FQCN)
LOG.info("Falling back to DefaultTableService directly.");
return new org.apache.amoro.server.table.DefaultTableService(conf, catalogManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.catalog.CatalogManager;

/**
* SPI provider for {@link TableService}. Implementations should be registered via
* META-INF/services/org.apache.amoro.server.table.TableServiceProvider
*/
public interface TableServiceProvider {

/** Provider name to select by configuration, e.g., "default". */
String name();

/** Create a {@link TableService} instance. */
TableService create(Configurations configuration, CatalogManager catalogManager);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table.spi;

import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceProvider;

public class DefaultTableServiceProvider implements TableServiceProvider {

@Override
public String name() {
return "default";
}

@Override
public TableService create(Configurations configuration, CatalogManager catalogManager) {
return new DefaultTableService(configuration, catalogManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.amoro.server.table.spi.DefaultTableServiceProvider
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import org.apache.amoro.server.manager.EventsManager;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.TableServiceLoader;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;

import java.time.Duration;

public abstract class AMSServiceTestBase extends AMSManagerTestBase {
private static DefaultTableService TABLE_SERVICE = null;
private static TableService TABLE_SERVICE = null;
private static DefaultOptimizingService OPTIMIZING_SERVICE = null;

@BeforeClass
Expand All @@ -41,7 +42,7 @@ public static void initTableService() {
configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L));
configurations.set(
AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L));
TABLE_SERVICE = new DefaultTableService(new Configurations(), CATALOG_MANAGER);
TABLE_SERVICE = TableServiceLoader.load(configurations, CATALOG_MANAGER);
OPTIMIZING_SERVICE =
new DefaultOptimizingService(
configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE);
Expand All @@ -65,7 +66,7 @@ public static void disposeTableService() {
EventsManager.dispose();
}

protected DefaultTableService tableService() {
protected TableService tableService() {
return TABLE_SERVICE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected DefaultTableService tableService() {
if (tableService != null) {
return tableService;
} else {
return super.tableService();
return (DefaultTableService) super.tableService();
}
}

Expand Down