Skip to content

Commit 3129ed8

Browse files
[SDP] SystemMetadata
1 parent 60339c4 commit 3129ed8

File tree

5 files changed

+99
-1
lines changed

5 files changed

+99
-1
lines changed

docs/declarative-pipelines/FlowSystemMetadata.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
# FlowSystemMetadata
22

3+
`FlowSystemMetadata` is a [SystemMetadata](SystemMetadata.md) associated with a [Flow](#flow).
4+
5+
## Creating Instance
6+
7+
`FlowSystemMetadata` takes the following to be created:
8+
9+
* <span id="context"> [PipelineUpdateContext](PipelineUpdateContext.md)
10+
* <span id="flow"> [Flow](Flow.md)
11+
* <span id="graph"> [DataflowGraph](DataflowGraph.md)
12+
13+
`FlowSystemMetadata` is created when:
14+
15+
* `FlowPlanner` is requested to [plan a StreamingFlow for execution](FlowPlanner.md#plan)
16+
* `State` is requested to [clear out the state of a flow](State.md#reset)
17+
318
## latestCheckpointLocation { #latestCheckpointLocation }
419

520
```scala

docs/declarative-pipelines/PipelineExecution.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ startPipeline(): Unit
3434

3535
`startPipeline` [resolves and validates the dataflow graph](#resolveGraph) (of this pipeline update).
3636

37+
For a full-refresh update, `startPipeline` [resets the state of all the flows](State.md#reset) in the [DataflowGraph](DataflowGraph.md).
38+
3739
`startPipeline` [materializes the datasets](DatasetManager.md#materializeDatasets) (of this dataflow graph).
3840

3941
`startPipeline` creates a new [TriggeredGraphExecution](#graphExecution) for the materialized dataflow graph.

docs/declarative-pipelines/PipelineUpdateContext.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,25 @@
44

55
## Contract (Subset)
66

7+
### fullRefreshTables { #fullRefreshTables }
8+
9+
```scala
10+
fullRefreshTables: TableFilter
11+
```
12+
13+
`TableFilter` of the tables to be fully refreshed in a pipeline update run
14+
15+
See:
16+
17+
* [PipelineUpdateContextImpl](PipelineUpdateContextImpl.md#fullRefreshTables)
18+
19+
Used when:
20+
21+
* `DatasetManager` is requested to [constructFullRefreshSet](DatasetManager.md#constructFullRefreshSet)
22+
* `PipelineExecution` is requested to [start a pipeline update](PipelineExecution.md#startPipeline)
23+
* `PipelineUpdateContext` is requested to [refreshFlows](PipelineUpdateContext.md#refreshFlows)
24+
* `State` is requested to [find the inputs to reset (state of)](State.md#findElementsToReset)
25+
726
### refreshTables Table Filter { #refreshTables }
827

928
```scala
Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,48 @@
11
# State
22

3-
`State` is...FIXME
3+
## Reset State of All Flows { #reset }
4+
5+
```scala
6+
reset(
7+
resolvedGraph: DataflowGraph,
8+
env: PipelineUpdateContext): Seq[Input]
9+
reset(
10+
flow: ResolvedFlow,
11+
env: PipelineUpdateContext,
12+
graph: DataflowGraph): Unit // (1)!
13+
```
14+
15+
1. A private method
16+
17+
`reset` [finds ResolvedFlows to reset](#findElementsToReset) in the given [DataflowGraph](DataflowGraph.md) (and the [PipelineUpdateContext](PipelineUpdateContext.md)).
18+
19+
!!! info
20+
`reset` handles [ResolvedFlow](ResolvedFlow.md)s only.
21+
22+
`reset` prints out the following INFO message to the logs:
23+
24+
```text
25+
Clearing out state for flow [displayName]
26+
```
27+
28+
`reset` creates a [FlowSystemMetadata](FlowSystemMetadata.md).
29+
30+
For no [checkpoint directory](FlowSystemMetadata.md#latestCheckpointLocationOpt) available for the [ResolvedFlow](ResolvedFlow.md), `reset` prints out the following INFO message to the logs and exits.
31+
32+
```text
33+
Skipping resetting flow [identifier]
34+
since its destination not been previously materialized
35+
and we can't find the checkpoint location.
36+
```
37+
38+
Otherwise, when there is a [checkpoint directory](FlowSystemMetadata.md#latestCheckpointLocationOpt) available, `reset` creates a new checkpoint directory (by incrementing the checkpoint number) and prints out the following INFO message to the logs:
39+
40+
```text
41+
Created new checkpoint for stream [displayName] at [checkpoint_path].
42+
```
43+
44+
---
45+
46+
`reset` is used when:
47+
48+
* `PipelineExecution` is requested to [run a pipeline update](PipelineExecution.md#startPipeline) (with full-refresh update)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# SystemMetadata
2+
3+
## getLatestCheckpointDir { #getLatestCheckpointDir }
4+
5+
```scala
6+
getLatestCheckpointDir(
7+
rootDir: Path,
8+
createNewCheckpointDir: Boolean = false): String
9+
```
10+
11+
`getLatestCheckpointDir`...FIXME
12+
13+
---
14+
15+
`getLatestCheckpointDir` is used when:
16+
17+
* `FlowSystemMetadata` is requested to [get the latest checkpoint location](FlowSystemMetadata.md#latestCheckpointLocation) and [get the latest checkpoint location if available](#latestCheckpointLocationOpt)

0 commit comments

Comments
 (0)