From 719052f7a5644839eac604fd6b5e9dad85adf9fa Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Fri, 16 Aug 2024 15:00:25 +0530 Subject: [PATCH 1/6] merged dagNodeStateStore and failedDagNodeStateStore tables --- .../service/modules/flowgraph/Dag.java | 8 ++ .../DagStateStoreWithDagNodes.java | 3 +- .../MySqlDagManagementStateStore.java | 6 +- .../MysqlDagStateStoreWithDagNodes.java | 76 ++++++++++--------- .../MysqlDagStateStoreWithDagNodesTest.java | 26 +++++++ 5 files changed, 79 insertions(+), 40 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 29ffc485e02..0c743c4e2e7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.flowgraph; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -48,6 +49,8 @@ public class Dag { // Map to maintain parent to children mapping. private Map>> parentChildMap; private List> nodes; + @Setter + private boolean isFailedDag; @Setter @Deprecated // because this field is not persisted in mysql and contains information in very limited cases @@ -259,11 +262,16 @@ public static class DagNode { private T value; //List of parent Nodes that are dependencies of this Node. private List> parentNodes; + private boolean isFailedDag; //Constructor public DagNode(T value) { this.value = value; } + public DagNode(T value,boolean isFailedDag) { + this.value = value; + this.isFailedDag = isFailedDag; + } public void addParentNode(DagNode node) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 03aaf41520c..64c11f81e12 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one * Refer */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode) throws IOException; + int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode, boolean isFailedDag) throws IOException; + /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c0984f835b2..34fc0b15a2f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -134,10 +134,8 @@ public void addDag(Dag dag) throws IOException { @Override public void markDagFailed(DagManager.DagId dagId) throws IOException { Dag dag = this.dagStateStore.getDag(dagId); + dag.setFailedDag(true); this.failedDagStateStore.writeCheckpoint(dag); - this.dagStateStore.cleanUp(dagId); - // todo - updated failedDagStateStore iff cleanup returned 1 - // or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed` log.info("Marked dag failed {}", dagId); } @@ -161,7 +159,7 @@ public Optional> getFailedDag(DagManager.DagId dagId) thro @Override public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagId, dagNode, false); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 2692e20697a..121579fbab7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -77,20 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected final GsonSerDe> serDe; private final JobExecutionPlanDagFactory jobExecPlanDagFactory; - // todo add a column that tells if it is a running dag or a failed dag - protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" - + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " - + "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " - + "dag_node JSON NOT NULL, " - + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " - + "PRIMARY KEY (dag_node_id), " - + "UNIQUE INDEX dag_node_index (dag_node_id), " - + "INDEX dag_index (parent_dag_id))"; - - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) " - + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; - protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; - protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; + protected static final String CREATE_TABLE_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" + + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " + + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + + "is_failed_dag INT NOT NULL DEFAULT 0, " + + "PRIMARY KEY (dag_node_id), " + "UNIQUE INDEX dag_node_index (dag_node_id), " + + "INDEX dag_index (parent_dag_id))"; + + protected static final String INSERT_STATEMENT = + "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " + + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; + protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?"; + protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; private final ContextAwareCounter totalDagCount; @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement( + String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo } @Override - public void writeCheckpoint(Dag dag) - throws IOException { + public void writeCheckpoint(Dag dag) throws IOException { DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); boolean newDag = false; for (Dag.DagNode dagNode : dag.getNodes()) { - if (updateDagNode(dagId, dagNode) == 1) { + if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) { newDag = true; } } @@ -153,7 +153,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException { return deleteStatement.executeUpdate() != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - }}, true); + } + }, true); this.totalDagCount.dec(); return true; } @@ -167,7 +168,8 @@ public void cleanUp(String dagId) throws IOException { @Override public List> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); } + + "the DagManager that is replaced by DagProcessingEngine"); + } @Override public Dag getDag(DagManager.DagId dagId) throws IOException { @@ -195,33 +197,37 @@ private Dag convertDagNodesIntoDag(Set dagNode) throws IOException { + public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode, boolean isFailedDag) + throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { try { insertStatement.setString(1, dagNodeId); insertStatement.setString(2, parentDagId.toString()); insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + insertStatement.setInt(4, isFailedDag ? 1 : 0); return insertStatement.executeUpdate(); } catch (SQLException e) { throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e); - }}, true); + } + }, true); } @Override public Set> getDagNodes(DagManager.DagId parentDagId) throws IOException { - return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { - getStatement.setString(1, parentDagId.toString()); - HashSet> dagNodes = new HashSet<>(); - try (ResultSet rs = getStatement.executeQuery()) { - while (rs.next()) { - dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); - } - return dagNodes; - } catch (SQLException e) { - throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); - } - }, true); + return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), + getStatement -> { + getStatement.setString(1, parentDagId.toString()); + HashSet> dagNodes = new HashSet<>(); + try (ResultSet rs = getStatement.executeQuery()) { + while (rs.next()) { + dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2))); + } + return dagNodes; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); + } + }, true); } @Override @@ -230,7 +236,7 @@ public Optional> getDagNode(DagNodeId dagNodeId) t getStatement.setString(1, dagNodeId.toString()); try (ResultSet rs = getStatement.executeQuery()) { if (rs.next()) { - return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); + return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0),rs.getBoolean(2))); } return Optional.empty(); } catch (SQLException e) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java index 131186b3d12..4c10a0dc5fb 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java @@ -137,4 +137,30 @@ public void testAddGetAndDeleteDag() throws Exception{ Assert.assertNull(this.dagStateStore.getDag(dagId1)); Assert.assertNull(this.dagStateStore.getDag(dagId2)); } + + @Test + public void testMarkDagAsFailed() throws Exception { + //Set up initial conditions + Dag dag = DagTestUtils.buildDag("test_dag", 789L); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + + this.dagStateStore.writeCheckpoint(dag); + //Check Initial State + for (Dag.DagNode node : dag.getNodes()) { + Assert.assertFalse(node.isFailedDag()); + } + dag.setFailedDag(true); + + this.dagStateStore.writeCheckpoint(dag); + + Dag updatedDag = this.dagStateStore.getDag(dagId); + for (Dag.DagNode node : updatedDag.getNodes()) { + Assert.assertTrue(node.isFailedDag()); + } + + // Cleanup + dagStateStore.cleanUp(dagId); + Assert.assertNull(this.dagStateStore.getDag(dagId)); + } + } \ No newline at end of file From 68f6ea2a14b0e7f0290269c8be399b27e2364db8 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Fri, 16 Aug 2024 15:00:25 +0530 Subject: [PATCH 2/6] merged dagNodeStateStore and failedDagNodeStateStore tables --- .../service/modules/flowgraph/Dag.java | 8 ++ .../DagStateStoreWithDagNodes.java | 3 +- .../MySqlDagManagementStateStore.java | 6 +- .../MysqlDagStateStoreWithDagNodes.java | 83 +++++++++++-------- .../MysqlDagStateStoreWithDagNodesTest.java | 25 ++++++ 5 files changed, 84 insertions(+), 41 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 29ffc485e02..0c743c4e2e7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.flowgraph; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -48,6 +49,8 @@ public class Dag { // Map to maintain parent to children mapping. private Map>> parentChildMap; private List> nodes; + @Setter + private boolean isFailedDag; @Setter @Deprecated // because this field is not persisted in mysql and contains information in very limited cases @@ -259,11 +262,16 @@ public static class DagNode { private T value; //List of parent Nodes that are dependencies of this Node. private List> parentNodes; + private boolean isFailedDag; //Constructor public DagNode(T value) { this.value = value; } + public DagNode(T value,boolean isFailedDag) { + this.value = value; + this.isFailedDag = isFailedDag; + } public void addParentNode(DagNode node) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 03aaf41520c..64c11f81e12 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one * Refer */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode) throws IOException; + int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode, boolean isFailedDag) throws IOException; + /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c0984f835b2..34fc0b15a2f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -134,10 +134,8 @@ public void addDag(Dag dag) throws IOException { @Override public void markDagFailed(DagManager.DagId dagId) throws IOException { Dag dag = this.dagStateStore.getDag(dagId); + dag.setFailedDag(true); this.failedDagStateStore.writeCheckpoint(dag); - this.dagStateStore.cleanUp(dagId); - // todo - updated failedDagStateStore iff cleanup returned 1 - // or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed` log.info("Marked dag failed {}", dagId); } @@ -161,7 +159,7 @@ public Optional> getFailedDag(DagManager.DagId dagId) thro @Override public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagId, dagNode, false); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 2692e20697a..9d858070e3d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -77,20 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected final GsonSerDe> serDe; private final JobExecutionPlanDagFactory jobExecPlanDagFactory; - // todo add a column that tells if it is a running dag or a failed dag - protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" - + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " - + "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " - + "dag_node JSON NOT NULL, " - + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " - + "PRIMARY KEY (dag_node_id), " - + "UNIQUE INDEX dag_node_index (dag_node_id), " - + "INDEX dag_index (parent_dag_id))"; - - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) " - + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; - protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; - protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; + protected static final String CREATE_TABLE_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" + + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " + + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + + "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), " + + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; + + protected static final String INSERT_STATEMENT = + "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " + + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; + protected static final String GET_DAG_NODES_STATEMENT = + "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?"; + protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; private final ContextAwareCounter totalDagCount; @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement( + String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo } @Override - public void writeCheckpoint(Dag dag) - throws IOException { + public void writeCheckpoint(Dag dag) throws IOException { DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); boolean newDag = false; for (Dag.DagNode dagNode : dag.getNodes()) { - if (updateDagNode(dagId, dagNode) == 1) { + if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) { newDag = true; } } @@ -153,7 +153,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException { return deleteStatement.executeUpdate() != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - }}, true); + } + }, true); this.totalDagCount.dec(); return true; } @@ -167,7 +168,8 @@ public void cleanUp(String dagId) throws IOException { @Override public List> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); } + + "the DagManager that is replaced by DagProcessingEngine"); + } @Override public Dag getDag(DagManager.DagId dagId) throws IOException { @@ -191,37 +193,46 @@ private Dag convertDagNodesIntoDag(Set dag = + jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList())); + if (dagNodes.stream().anyMatch(Dag.DagNode::isFailedDag)) { + dag.setFailedDag(true); + } + return dag; } @Override - public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode) throws IOException { + public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode, boolean isFailedDag) + throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { try { insertStatement.setString(1, dagNodeId); insertStatement.setString(2, parentDagId.toString()); insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + insertStatement.setInt(4, isFailedDag ? 1 : 0); return insertStatement.executeUpdate(); } catch (SQLException e) { throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e); - }}, true); + } + }, true); } @Override public Set> getDagNodes(DagManager.DagId parentDagId) throws IOException { - return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { - getStatement.setString(1, parentDagId.toString()); - HashSet> dagNodes = new HashSet<>(); - try (ResultSet rs = getStatement.executeQuery()) { - while (rs.next()) { - dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); - } - return dagNodes; - } catch (SQLException e) { - throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); - } - }, true); + return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), + getStatement -> { + getStatement.setString(1, parentDagId.toString()); + HashSet> dagNodes = new HashSet<>(); + try (ResultSet rs = getStatement.executeQuery()) { + while (rs.next()) { + dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2))); + } + return dagNodes; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); + } + }, true); } @Override @@ -230,7 +241,7 @@ public Optional> getDagNode(DagNodeId dagNodeId) t getStatement.setString(1, dagNodeId.toString()); try (ResultSet rs = getStatement.executeQuery()) { if (rs.next()) { - return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); + return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2))); } return Optional.empty(); } catch (SQLException e) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java index 131186b3d12..b030b6fe5e0 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java @@ -137,4 +137,29 @@ public void testAddGetAndDeleteDag() throws Exception{ Assert.assertNull(this.dagStateStore.getDag(dagId1)); Assert.assertNull(this.dagStateStore.getDag(dagId2)); } + + @Test + public void testMarkDagAsFailed() throws Exception { + //Set up initial conditions + Dag dag = DagTestUtils.buildDag("test_dag", 789L); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + + this.dagStateStore.writeCheckpoint(dag); + //Check Initial State + for (Dag.DagNode node : dag.getNodes()) { + Assert.assertFalse(node.isFailedDag()); + } + dag.setFailedDag(true); + this.dagStateStore.writeCheckpoint(dag); + + Dag updatedDag = this.dagStateStore.getDag(dagId); + for (Dag.DagNode node : updatedDag.getNodes()) { + Assert.assertTrue(node.isFailedDag()); + } + + // Cleanup + dagStateStore.cleanUp(dagId); + Assert.assertNull(this.dagStateStore.getDag(dagId)); + } + } \ No newline at end of file From 8614a0dfe4fa2a79cc61f41635e1173e149cd8b8 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Tue, 20 Aug 2024 12:04:56 +0530 Subject: [PATCH 3/6] added isFailedDag at Node level --- .../java/org/apache/gobblin/service/modules/flowgraph/Dag.java | 1 + .../modules/orchestration/MysqlDagStateStoreWithDagNodes.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 0c743c4e2e7..210e4ad88b2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -262,6 +262,7 @@ public static class DagNode { private T value; //List of parent Nodes that are dependencies of this Node. private List> parentNodes; + @Setter private boolean isFailedDag; //Constructor diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 9d858070e3d..1282dbaa8a8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -197,6 +197,9 @@ private Dag convertDagNodesIntoDag(Set Date: Tue, 20 Aug 2024 12:17:40 +0530 Subject: [PATCH 4/6] cleaned up failed dag state store references --- .../orchestration/DagManagementStateStore.java | 13 +------------ .../MySqlDagManagementStateStore.java | 18 +----------------- .../orchestration/proc/ResumeDagProc.java | 4 ++-- .../MySqlDagManagementStateStoreTest.java | 4 +--- 4 files changed, 5 insertions(+), 34 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 139082545b2..e1979ee71c3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -77,22 +77,11 @@ public interface DagManagementStateStore { /** * This marks the dag as a failed one. - * Failed dags are queried using {@link DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried. + * Failed dags are queried using {@link DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried. * @param dagId failing dag's dagId */ void markDagFailed(DagManager.DagId dagId) throws IOException; - /** - * Returns the failed dag. - * If the dag is not found because it was never marked as failed through - * {@link DagManagementStateStore#markDagFailed(org.apache.gobblin.service.modules.orchestration.DagManager.DagId)}, - * it returns Optional.absent. - * @param dagId dag id of the failed dag - */ - Optional> getFailedDag(DagManager.DagId dagId) throws IOException; - - void deleteFailedDag(DagManager.DagId dagId) throws IOException; - /** * Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store. * Note that a DagNode is a part of a Dag and must already be present in the store through diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 34fc0b15a2f..53f14503eba 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -61,15 +61,12 @@ @Slf4j @Singleton public class MySqlDagManagementStateStore implements DagManagementStateStore { - // todo - these two stores should merge private DagStateStoreWithDagNodes dagStateStore; - private DagStateStoreWithDagNodes failedDagStateStore; private final JobStatusRetriever jobStatusRetriever; private boolean dagStoresInitialized = false; private final UserQuotaManager quotaManager; Map topologySpecMap; private final Config config; - public static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; FlowCatalog flowCatalog; @Getter @@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, User private synchronized void start() { if (!dagStoresInitialized) { this.dagStateStore = createDagStateStore(config, topologySpecMap); - this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), - topologySpecMap); // This implementation does not need to update quota usage when the service restarts or when its leadership status // changes because quota usage are persisted in mysql table. For the same reason, there is no need to call getDags also. // Also, calling getDags during startUp may fail, because the topologies that are required to deserialize dags may @@ -135,7 +130,7 @@ public void addDag(Dag dag) throws IOException { public void markDagFailed(DagManager.DagId dagId) throws IOException { Dag dag = this.dagStateStore.getDag(dagId); dag.setFailedDag(true); - this.failedDagStateStore.writeCheckpoint(dag); + this.dagStateStore.writeCheckpoint(dag); log.info("Marked dag failed {}", dagId); } @@ -145,17 +140,6 @@ public void deleteDag(DagManager.DagId dagId) throws IOException { log.info("Deleted dag {}", dagId); } - @Override - public void deleteFailedDag(DagManager.DagId dagId) throws IOException { - this.failedDagStateStore.cleanUp(dagId); - log.info("Deleted failed dag {}", dagId); - } - - @Override - public Optional> getFailedDag(DagManager.DagId dagId) throws IOException { - return Optional.of(this.failedDagStateStore.getDag(dagId)); - } - @Override public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java index 8326d83f095..773b6a605e7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java @@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) { @Override protected Optional> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { - return dagManagementStateStore.getFailedDag(getDagId()); + return dagManagementStateStore.getDag(getDagId()); } @Override @@ -92,7 +92,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional Date: Tue, 20 Aug 2024 12:38:36 +0530 Subject: [PATCH 5/6] refactored --- .../MySqlDagManagementStateStore.java | 2 +- .../MysqlDagStateStoreWithDagNodes.java | 41 ++++++++----------- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 53f14503eba..7ab6e8b34aa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -143,7 +143,7 @@ public void deleteDag(DagManager.DagId dagId) throws IOException { @Override public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode, false); + this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is set as false because addDagNodeState adds a new DagNode, doesn't update an existing dagNode as failed. } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 1282dbaa8a8..82359aa1d2c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -77,19 +77,17 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected final GsonSerDe> serDe; private final JobExecutionPlanDagFactory jobExecPlanDagFactory; - protected static final String CREATE_TABLE_STATEMENT = - "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH - + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" - + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " - + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " - + "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), " - + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; - - protected static final String INSERT_STATEMENT = - "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " - + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; - protected static final String GET_DAG_NODES_STATEMENT = - "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?"; + protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" + + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" + + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " + + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + + "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), " + + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; + + protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " + + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; + protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?"; protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; private final ContextAwareCounter totalDagCount; @@ -105,8 +103,7 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement( - String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { @@ -168,8 +165,7 @@ public void cleanUp(String dagId) throws IOException { @Override public List> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); - } + + "the DagManager that is replaced by DagProcessingEngine"); } @Override public Dag getDag(DagManager.DagId dagId) throws IOException { @@ -193,13 +189,12 @@ private Dag convertDagNodesIntoDag(Set dag = - jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList())); - if (dagNodes.stream().anyMatch(Dag.DagNode::isFailedDag)) { + Dag dag = jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList())); + + // if any node of the dag is failed it means that the dag has been marked as failed, update the is_failed_dag field of the dag and it's nodes as true + if (dag.getNodes().stream().anyMatch(Dag.DagNode::isFailedDag)) { dag.setFailedDag(true); - for (Dag.DagNode dagNode : dag.getNodes()) { - dagNode.setFailedDag(true); - } + dag.getNodes().forEach(node -> node.setFailedDag(true)); } return dag; } From 7e3e5560cecee226cdd20be692699e7ee832e6f2 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Wed, 21 Aug 2024 20:44:19 +0530 Subject: [PATCH 6/6] addressed PR comments --- .../service/modules/flowgraph/Dag.java | 6 -- .../MysqlDagStateStoreWithDagNodes.java | 76 +++++++++---------- .../MysqlDagStateStoreWithDagNodesTest.java | 66 +++++++++++++--- 3 files changed, 88 insertions(+), 60 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 3016354b2b9..5f6e73ebd6c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -261,17 +261,11 @@ public static class DagNode { private T value; //List of parent Nodes that are dependencies of this Node. private List> parentNodes; - @Setter - private boolean isFailedDag; //Constructor public DagNode(T value) { this.value = value; } - public DagNode(T value,boolean isFailedDag) { - this.value = value; - this.isFailedDag = isFailedDag; - } public void addParentNode(DagNode node) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 82359aa1d2c..afe24c740a2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -77,18 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected final GsonSerDe> serDe; private final JobExecutionPlanDagFactory jobExecPlanDagFactory; - protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" - + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH - + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" - + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " - + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " - + "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), " - + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; - - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " - + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; - protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?"; - protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?"; + protected static final String CREATE_TABLE_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" + + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " + + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + + "is_failed_dag TINYINT(1) DEFAULT 0, " + "PRIMARY KEY (dag_node_id), " + + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; + + protected static final String INSERT_STATEMENT = + "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " + + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; + protected static final String GET_DAG_NODES_STATEMENT = + "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; + protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; private final ContextAwareCounter totalDagCount; @@ -103,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement( + String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { @@ -150,8 +153,7 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException { return deleteStatement.executeUpdate() != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - } - }, true); + }}, true); this.totalDagCount.dec(); return true; } @@ -165,7 +167,7 @@ public void cleanUp(String dagId) throws IOException { @Override public List> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); } + + "the DagManager that is replaced by DagProcessingEngine");} @Override public Dag getDag(DagManager.DagId dagId) throws IOException { @@ -189,19 +191,11 @@ private Dag convertDagNodesIntoDag(Set dag = jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList())); - - // if any node of the dag is failed it means that the dag has been marked as failed, update the is_failed_dag field of the dag and it's nodes as true - if (dag.getNodes().stream().anyMatch(Dag.DagNode::isFailedDag)) { - dag.setFailedDag(true); - dag.getNodes().forEach(node -> node.setFailedDag(true)); - } - return dag; + return jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList())); } @Override - public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode, boolean isFailedDag) - throws IOException { + public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode, boolean isFailedDag) throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { try { @@ -212,25 +206,23 @@ public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode> getDagNodes(DagManager.DagId parentDagId) throws IOException { - return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), - getStatement -> { - getStatement.setString(1, parentDagId.toString()); - HashSet> dagNodes = new HashSet<>(); - try (ResultSet rs = getStatement.executeQuery()) { - while (rs.next()) { - dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2))); - } - return dagNodes; - } catch (SQLException e) { - throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); - } - }, true); + return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { + getStatement.setString(1, parentDagId.toString()); + HashSet> dagNodes = new HashSet<>(); + try (ResultSet rs = getStatement.executeQuery()) { + while (rs.next()) { + dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); + } + return dagNodes; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); + } + }, true); } @Override @@ -239,7 +231,7 @@ public Optional> getDagNode(DagNodeId dagNodeId) t getStatement.setString(1, dagNodeId.toString()); try (ResultSet rs = getStatement.executeQuery()) { if (rs.next()) { - return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2))); + return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); } return Optional.empty(); } catch (SQLException e) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java index b030b6fe5e0..8054f4f7faa 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java @@ -17,10 +17,18 @@ package org.apache.gobblin.service.modules.orchestration; +import java.io.IOException; import java.net.URI; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -36,17 +44,22 @@ import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; - +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.util.DBStatementExecutor; /** * Mainly testing functionalities related to DagStateStore but not Mysql-related components. */ +@Slf4j public class MysqlDagStateStoreWithDagNodesTest { private DagStateStore dagStateStore; - private static final String TEST_USER = "testUser"; private static ITestMetastoreDatabase testDb; + private DBStatementExecutor dbStatementExecutor; + private static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node, is_failed_dag FROM %s WHERE parent_dag_id = ?"; + private static final String tableName = "dag_node_state_store"; @BeforeClass public void setUp() throws Exception { @@ -63,6 +76,8 @@ public void setUp() throws Exception { URI specExecURI = new URI(specExecInstance); topologySpecMap.put(specExecURI, topologySpec); this.dagStateStore = new MysqlDagStateStoreWithDagNodes(configBuilder.build(), topologySpecMap); + dbStatementExecutor = new DBStatementExecutor( + MysqlDataSourceFactory.get(configBuilder.build(), SharedResourcesBrokerFactory.getImplicitBroker()), log); } @AfterClass(alwaysRun = true) @@ -74,7 +89,7 @@ public void tearDown() throws Exception { } @Test - public void testAddGetAndDeleteDag() throws Exception{ + public void testAddGetAndDeleteDag() throws Exception { Dag originalDag1 = DagTestUtils.buildDag("random_1", 123L); Dag originalDag2 = DagTestUtils.buildDag("random_2", 456L); DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1); @@ -140,26 +155,53 @@ public void testAddGetAndDeleteDag() throws Exception{ @Test public void testMarkDagAsFailed() throws Exception { - //Set up initial conditions + // Set up initial conditions Dag dag = DagTestUtils.buildDag("test_dag", 789L); DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); this.dagStateStore.writeCheckpoint(dag); - //Check Initial State - for (Dag.DagNode node : dag.getNodes()) { - Assert.assertFalse(node.isFailedDag()); + + // Fetch all initial states into a list + List initialStates = fetchDagNodeStates(dagId.toString()); + + // Check Initial State + for (Boolean state : initialStates) { + Assert.assertFalse(state); } + // Set the DAG as failed dag.setFailedDag(true); this.dagStateStore.writeCheckpoint(dag); - Dag updatedDag = this.dagStateStore.getDag(dagId); - for (Dag.DagNode node : updatedDag.getNodes()) { - Assert.assertTrue(node.isFailedDag()); - } + // Fetch all states after marking the DAG as failed + List failedStates = fetchDagNodeStates(dagId.toString()); - // Cleanup + // Check if all states are now true (indicating failure) + for (Boolean state : failedStates) { + Assert.assertTrue(state); + } dagStateStore.cleanUp(dagId); Assert.assertNull(this.dagStateStore.getDag(dagId)); } + private List fetchDagNodeStates(String dagId) throws IOException { + List states = new ArrayList<>(); + + dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { + + getStatement.setString(1, dagId.toString()); + + HashSet> dagNodes = new HashSet<>(); + + try (ResultSet rs = getStatement.executeQuery()) { + while (rs.next()) { + states.add(rs.getBoolean(2)); + } + return dagNodes; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag nodes for dag %s", dagId), e); + } + }, true); + + return states; + } } \ No newline at end of file