Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c34a03c
TAJO-1866: TestHAServiceHDFSImpl:: testAutoFailOver fails occasionall…
blrunner Feb 19, 2016
d0cd5b1
Remove unused codes
blrunner Feb 19, 2016
597fe0e
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Feb 19, 2016
6937257
Clean up codes
blrunner Feb 19, 2016
2020db4
Remove unnecessary lines
blrunner Feb 19, 2016
522267f
Shutdown backup TajoMaster
blrunner Feb 19, 2016
96ec500
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Feb 22, 2016
a2ceadc
Trigger for travis CI build
blrunner Feb 22, 2016
e04c65e
Trigger for travis CI build
blrunner Feb 22, 2016
323f7a4
Trigger for travis CI build
blrunner Feb 22, 2016
9e5f846
Trigger for travis CI build
blrunner Feb 22, 2016
7c07013
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 7, 2016
62895c0
Trigger for travis CI build
blrunner Mar 7, 2016
ef5583f
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
ac79095
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
b964834
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 18, 2016
39b932b
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Mar 30, 2016
68d48c6
Fix a compile bug
blrunner Mar 30, 2016
f262a92
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 19, 2016
fa58739
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 21, 2016
1e34ff5
Fix a compile bug
blrunner Apr 21, 2016
620321e
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 22, 2016
ef97d63
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner Apr 28, 2016
42dc0b1
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo…
blrunner May 8, 2016
9113442
Update parameters of CatalogUtil::newTableMeta
blrunner May 8, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Tajo Change Log

Release 0.12.0 - unreleased


NEW FEATURES

TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,16 @@ public HBaseTestClusterUtil getHBaseUtil() {
////////////////////////////////////////////////////////
// Catalog Section
////////////////////////////////////////////////////////
public CatalogServer startCatalogCluster() throws Exception {
private void initCatalogCluster() throws Exception {
if(isCatalogServerRunning) throw new IOException("Catalog Cluster already running");

CatalogTestingUtil.configureCatalog(conf, clusterTestBuildDir.getAbsolutePath());
LOG.info("Apache Derby repository is set to " + conf.get(CatalogConstants.CATALOG_URI));
conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
}

public CatalogServer startCatalogCluster() throws Exception {
initCatalogCluster();
catalogServer = new CatalogServer();
catalogServer.init(conf);
catalogServer.start();
Expand Down Expand Up @@ -460,6 +463,15 @@ private void startTajoWorkers(int numSlaves) throws Exception {
}
}

public void startMaster() throws Exception {
initCatalogCluster();
tajoMaster = new TajoMaster();
tajoMaster.init(conf);
tajoMaster.start();
isCatalogServerRunning = true;
catalogServer = tajoMaster.getCatalogServer();
}

public TajoMaster getMaster() {
return this.tajoMaster;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,102 @@

package org.apache.tajo.ha;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TpchTestBase;
import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertTrue;
import static junit.framework.TestCase.assertEquals;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.junit.Assert.assertNotEquals;

public class TestHAServiceHDFSImpl {
private TajoTestingCluster cluster;

private TajoMaster primaryMaster;
private TajoMaster backupMaster;
private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestHAServiceHDFSImpl";
private TajoTestingCluster util;
private FileTablespace sm;
private CatalogService catalog;
private Path testDir;
private TableDesc employee;

@Before
public void setUp() throws Exception {
util = new TajoTestingCluster(true);

util.startMaster();
catalog = util.getCatalogService();

sm = TablespaceManager.getLocalFs();

testDir = CommonTestingUtil.getTestDir(TEST_PATH);

Schema schema = SchemaBuilder.builder()
.add("managerid", TajoDataTypes.Type.INT4)
.add("empid", TajoDataTypes.Type.INT4)
.add("deptname", TajoDataTypes.Type.TEXT)
.build();

TableMeta employeeMeta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, util.getConfiguration());

Path employeePath = new Path(testDir, "employee.csv");
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(employeeMeta, schema, employeePath);
appender.enableStats();
appender.init();
appender.flush();
appender.close();

employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
catalog.createTable(employee);
}

private Path haPath, activePath, backupPath;
@After
public void tearDown() throws Exception {
CommonTestingUtil.cleanupTestDir(TEST_PATH);
util.shutdownCatalogCluster();
}

//@Test TODO: enable this test after TAJO-1866 fixed
@Test
public final void testAutoFailOver() throws Exception {
cluster = TpchTestBase.getInstance().getTestingCluster();
TajoMaster backupMaster = null;

try {
FileSystem fs = cluster.getDefaultFileSystem();
TajoMaster primaryMaster = util.getMaster();
assertNotNull(primaryMaster);

TajoConf primaryConf = setConfigForHAMaster();
primaryMaster = new TajoMaster();
primaryMaster.init(primaryConf);
primaryMaster.start();

TajoConf backupConf = setConfigForHAMaster();
TajoConf conf = getBackupMasterConfiguration();
backupMaster = new TajoMaster();
backupMaster.init(backupConf);
backupMaster.init(conf);
backupMaster.start();
Assert.assertNotNull(backupMaster);

ServiceTracker tracker = ServiceTrackerFactory.get(primaryConf);

ServiceTracker tracker = ServiceTrackerFactory.get(util.getConfiguration());
assertNotEquals(primaryMaster.getMasterName(), backupMaster.getMasterName());
verifySystemDirectories(fs);

FileSystem fs = sm.getFileSystem();
Path haPath = TajoConf.getSystemHADir(util.getConfiguration());
assertTrue(fs.exists(haPath));

Path activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
assertTrue(fs.exists(activePath));

Path backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
assertTrue(fs.exists(backupPath));

assertEquals(2, fs.listStatus(activePath).length);
assertEquals(1, fs.listStatus(backupPath).length);
Expand All @@ -75,68 +123,43 @@ public final void testAutoFailOver() throws Exception {
assertTrue(fs.exists(new Path(backupPath, backupMaster.getMasterName().replaceAll(":", "_"))));

createDatabaseAndTable(tracker);
verifyDataBaseAndTable(tracker);
existDataBaseAndTable(tracker);

primaryMaster.stop();

verifyDataBaseAndTable(tracker);
existDataBaseAndTable(tracker);

assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE)));
assertTrue(fs.exists(new Path(activePath, backupMaster.getMasterName().replaceAll(":", "_"))));

assertEquals(2, fs.listStatus(activePath).length);
assertEquals(0, fs.listStatus(backupPath).length);

assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, "employee"));
} finally {
if (backupMaster != null) {
backupMaster.stop();
backupMaster.close();
}
}

}

private TajoConf setConfigForHAMaster() {
TajoConf conf = new TajoConf(cluster.getConfiguration());

conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
"localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
"localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS,
"localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS,
"localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS,
"localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.REST_SERVICE_ADDRESS,
"localhost:" + NetUtils.getFreeSocketPort());
private TajoConf getBackupMasterConfiguration() {
TajoConf conf = new TajoConf(util.getConfiguration());

conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort());
conf.setVar(TajoConf.ConfVars.REST_SERVICE_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort());

conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true);
conf.setIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL, 1000);

//Client API service RPC Server
conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);

// Internal RPC Server
conf.setIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2);
conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);

return conf;
}

private void verifySystemDirectories(FileSystem fs) throws Exception {
haPath = TajoConf.getSystemHADir(cluster.getConfiguration());
assertTrue(fs.exists(haPath));

activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
assertTrue(fs.exists(activePath));

backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
assertTrue(fs.exists(backupPath));
}

private void createDatabaseAndTable(ServiceTracker tracker) throws Exception {
TajoClient client = null;
try {
Expand All @@ -148,7 +171,7 @@ private void createDatabaseAndTable(ServiceTracker tracker) throws Exception {
}
}

private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception {
private void existDataBaseAndTable(ServiceTracker tracker) throws Exception {
TajoClient client = null;
try {
client = new TajoClientImpl(tracker);
Expand All @@ -159,4 +182,5 @@ private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception {
IOUtils.cleanup(null, client);
}
}
}

}