Skip to content

Commit 300aa09

Browse files
ChenyuLInxemmyoop
andauthored
Support artifact upload (#11419)
* wip * reorganize * changie * retry * nits * nits * improve retry, adjust error, adjust host name * adjust logic * pr_feedback * Update .changes/unreleased/Features-20250323-151625.yaml Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com> --------- Co-authored-by: Emily Rockman <emily.rockman@dbtlabs.com>
1 parent 4930084 commit 300aa09

19 files changed

+645
-28
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
kind: Features
2+
body: Support artifact upload to dbt Cloud
3+
time: 2025-03-23T15:16:25.160263-07:00
4+
custom:
5+
Author: ChenyuLInx
6+
Issue: "11418"

core/dbt/cli/artifact_upload.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
import os
2+
import time
3+
import uuid
4+
import zipfile
5+
6+
import requests
7+
from pydantic import BaseSettings
8+
9+
import dbt.tracking
10+
from dbt.config.runtime import UnsetProfile, load_project
11+
from dbt.constants import (
12+
CATALOG_FILENAME,
13+
MANIFEST_FILE_NAME,
14+
RUN_RESULTS_FILE_NAME,
15+
SOURCE_RESULT_FILE_NAME,
16+
)
17+
from dbt.events.types import ArtifactUploadSkipped, ArtifactUploadSuccess
18+
from dbt.exceptions import DbtProjectError
19+
from dbt_common.events.functions import fire_event
20+
from dbt_common.exceptions import DbtBaseException as DbtException
21+
22+
MAX_RETRIES = 3
23+
24+
EXECUTION_ARTIFACTS = [MANIFEST_FILE_NAME, RUN_RESULTS_FILE_NAME]
25+
26+
ARTIFACTS_TO_UPLOAD = {
27+
"retry": EXECUTION_ARTIFACTS,
28+
"clone": EXECUTION_ARTIFACTS,
29+
"build": EXECUTION_ARTIFACTS,
30+
"run": EXECUTION_ARTIFACTS,
31+
"run-operation": EXECUTION_ARTIFACTS,
32+
"seed": EXECUTION_ARTIFACTS,
33+
"snapshot": EXECUTION_ARTIFACTS,
34+
"test": EXECUTION_ARTIFACTS,
35+
"freshness": [MANIFEST_FILE_NAME, SOURCE_RESULT_FILE_NAME],
36+
"generate": [MANIFEST_FILE_NAME, CATALOG_FILENAME],
37+
}
38+
39+
40+
class ArtifactUploadConfig(BaseSettings):
41+
tenant_hostname: str
42+
DBT_CLOUD_TOKEN: str
43+
DBT_CLOUD_ACCOUNT_ID: str
44+
DBT_CLOUD_ENVIRONMENT_ID: str
45+
46+
def get_ingest_url(self):
47+
return f"https://{self.tenant_hostname}/api/private/accounts/{self.DBT_CLOUD_ACCOUNT_ID}/environments/{self.DBT_CLOUD_ENVIRONMENT_ID}/ingests/"
48+
49+
def get_complete_url(self, ingest_id):
50+
return f"{self.get_ingest_url()}{ingest_id}/"
51+
52+
def get_headers(self, invocation_id=None):
53+
if invocation_id is None:
54+
invocation_id = str(uuid.uuid4())
55+
return {
56+
"Accept": "application/json",
57+
"X-Invocation-Id": invocation_id,
58+
"Authorization": f"Token {self.DBT_CLOUD_TOKEN}",
59+
}
60+
61+
62+
def _retry_with_backoff(operation_name, func, max_retries=MAX_RETRIES, retry_codes=None):
63+
"""Execute a function with exponential backoff retry logic.
64+
65+
Args:
66+
operation_name: Name of the operation for error messages
67+
func: Function to execute that returns (success, result)
68+
max_retries: Maximum number of retry attempts
69+
70+
Returns:
71+
The result from the function if successful
72+
73+
Raises:
74+
DbtException: If all retry attempts fail
75+
"""
76+
if retry_codes is None:
77+
retry_codes = [500, 502, 503, 504]
78+
retry_delay = 1
79+
for attempt in range(max_retries):
80+
try:
81+
success, result = func()
82+
if success:
83+
return result
84+
85+
if result.status_code not in retry_codes:
86+
raise DbtException(f"Error {operation_name}: {result}")
87+
if attempt == max_retries - 1: # Last attempt
88+
raise DbtException(f"Error {operation_name}: {result}")
89+
except requests.RequestException as e:
90+
if attempt == max_retries - 1: # Last attempt
91+
raise DbtException(f"Error {operation_name}: {str(e)}")
92+
93+
time.sleep(retry_delay)
94+
retry_delay *= 2 # exponential backoff
95+
96+
97+
def upload_artifacts(project_dir, target_path, command):
98+
# Check if there are artifacts to upload for this command
99+
if command not in ARTIFACTS_TO_UPLOAD:
100+
fire_event(ArtifactUploadSkipped(msg=f"No artifacts to upload for command {command}"))
101+
return
102+
103+
# read configurations
104+
try:
105+
project = load_project(
106+
project_dir, version_check=False, profile=UnsetProfile(), cli_vars=None
107+
)
108+
if not project.dbt_cloud or "tenant_hostname" not in project.dbt_cloud:
109+
raise DbtProjectError("dbt_cloud.tenant_hostname not found in dbt_project.yml")
110+
tenant_hostname = project.dbt_cloud["tenant_hostname"]
111+
if not tenant_hostname:
112+
raise DbtProjectError("dbt_cloud.tenant_hostname is empty in dbt_project.yml")
113+
except Exception as e:
114+
raise DbtProjectError(
115+
f"Error reading dbt_cloud.tenant_hostname from dbt_project.yml: {str(e)}"
116+
)
117+
118+
config = ArtifactUploadConfig(tenant_hostname=tenant_hostname)
119+
120+
if not target_path:
121+
target_path = "target"
122+
123+
# Create zip file with artifacts
124+
zip_file_name = "target.zip"
125+
with zipfile.ZipFile(zip_file_name, "w") as z:
126+
for artifact in ARTIFACTS_TO_UPLOAD[command]:
127+
z.write(os.path.join(target_path, artifact), artifact)
128+
129+
# Step 1: Create ingest request with retry
130+
def create_ingest():
131+
response = requests.post(url=config.get_ingest_url(), headers=config.get_headers())
132+
return response.status_code == 200, response
133+
134+
response = _retry_with_backoff("creating ingest request", create_ingest)
135+
response_data = response.json()
136+
ingest_id = response_data["data"]["id"]
137+
upload_url = response_data["data"]["upload_url"]
138+
139+
# Step 2: Upload the zip file to the provided URL with retry
140+
with open(zip_file_name, "rb") as f:
141+
file_data = f.read()
142+
143+
def upload_file():
144+
upload_response = requests.put(url=upload_url, data=file_data)
145+
return upload_response.status_code in (200, 204), upload_response
146+
147+
_retry_with_backoff("uploading artifacts", upload_file)
148+
149+
# Step 3: Mark the ingest as successful with retry
150+
def complete_ingest():
151+
complete_response = requests.patch(
152+
url=config.get_complete_url(ingest_id),
153+
headers=config.get_headers(),
154+
json={"upload_status": "SUCCESS"},
155+
)
156+
return complete_response.status_code == 204, complete_response
157+
158+
_retry_with_backoff("completing ingest", complete_ingest)
159+
160+
fire_event(ArtifactUploadSuccess(msg=f"command {command} completed successfully"))
161+
if dbt.tracking.active_user is not None:
162+
dbt.tracking.track_artifact_upload({"command": command})

core/dbt/cli/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def global_flags(func):
141141
@p.warn_error_options
142142
@p.write_json
143143
@p.use_fast_test_edges
144+
@p.upload_artifacts
144145
@functools.wraps(func)
145146
def wrapper(*args, **kwargs):
146147
return func(*args, **kwargs)

core/dbt/cli/params.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,3 +761,10 @@ def _version_callback(ctx, _param, value):
761761
help="Whether or not to write the manifest.json and run_results.json files to the target directory",
762762
default=True,
763763
)
764+
765+
upload_artifacts = click.option(
766+
"--upload-to-artifacts-ingest-api/--no-upload-to-artifacts-ingest-api",
767+
envvar="DBT_UPLOAD_TO_ARTIFACTS_INGEST_API",
768+
help="Whether or not to upload the artifacts to the dbt Cloud API",
769+
default=False,
770+
)

core/dbt/cli/requires.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import dbt.tracking
1111
from dbt.adapters.factory import adapter_management, get_adapter, register_adapter
12+
from dbt.cli.artifact_upload import upload_artifacts
1213
from dbt.cli.exceptions import ExceptionExit, ResultExit
1314
from dbt.cli.flags import Flags
1415
from dbt.config import RuntimeConfig
@@ -17,6 +18,7 @@
1718
from dbt.context.query_header import generate_query_header_context
1819
from dbt.events.logging import setup_event_logger
1920
from dbt.events.types import (
21+
ArtifactUploadError,
2022
CommandCompleted,
2123
MainEncounteredError,
2224
MainReportArgs,
@@ -26,7 +28,7 @@
2628
ResourceReport,
2729
)
2830
from dbt.exceptions import DbtProjectError, FailFastError
29-
from dbt.flags import get_flag_dict, set_flags
31+
from dbt.flags import get_flag_dict, get_flags, set_flags
3032
from dbt.mp_context import get_mp_context
3133
from dbt.parser.manifest import parse_manifest
3234
from dbt.plugins import set_up_plugin_manager
@@ -164,6 +166,15 @@ def wrapper(*args, **kwargs):
164166
finally:
165167
# Fire ResourceReport, but only on systems which support the resource
166168
# module. (Skip it on Windows).
169+
try:
170+
if get_flags().upload_to_artifacts_ingest_api:
171+
upload_artifacts(
172+
get_flags().project_dir, get_flags().target_path, ctx.command.name
173+
)
174+
175+
except Exception as e:
176+
fire_event(ArtifactUploadError(msg=str(e)))
177+
167178
if importlib.util.find_spec("resource") is not None:
168179
import resource
169180

core/dbt/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,6 @@
2222
MINIMUM_REQUIRED_TIME_SPINE_GRANULARITY = TimeGranularity.DAY
2323
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
2424
PACKAGE_LOCK_HASH_KEY = "sha1_hash"
25+
RUN_RESULTS_FILE_NAME = "run_results.json"
26+
CATALOG_FILENAME = "catalog.json"
27+
SOURCE_RESULT_FILE_NAME = "sources.json"

core/dbt/contracts/state.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from dbt.artifacts.schemas.freshness import FreshnessExecutionResultArtifact
66
from dbt.artifacts.schemas.manifest import WritableManifest
77
from dbt.artifacts.schemas.run import RunResultsArtifact
8+
from dbt.constants import RUN_RESULTS_FILE_NAME
89
from dbt.contracts.graph.manifest import Manifest
910
from dbt.events.types import WarnStateTargetEqual
1011
from dbt_common.events.functions import fire_event
@@ -43,7 +44,7 @@ def __init__(self, state_path: Path, target_path: Path, project_root: Path) -> N
4344
exc.add_filename(str(manifest_path))
4445
raise
4546

46-
results_path = self.project_root / self.state_path / "run_results.json"
47+
results_path = self.project_root / self.state_path / RUN_RESULTS_FILE_NAME
4748
self.results = load_result_state(results_path)
4849

4950
sources_path = self.project_root / self.state_path / "sources.json"

core/dbt/events/core_types.proto

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2231,3 +2231,34 @@ message ResourceReportMsg {
22312231
CoreEventInfo info = 1;
22322232
ResourceReport data = 2;
22332233
}
2234+
2235+
2236+
// Z061
2237+
message ArtifactUploadError {
2238+
string msg = 1;
2239+
}
2240+
2241+
message ArtifactUploadErrorMsg {
2242+
CoreEventInfo info = 1;
2243+
ArtifactUploadError data = 2;
2244+
}
2245+
2246+
// Z062
2247+
message ArtifactUploadSuccess {
2248+
string msg = 1;
2249+
}
2250+
2251+
message ArtifactUploadSuccessMsg {
2252+
CoreEventInfo info = 1;
2253+
ArtifactUploadSuccess data = 2;
2254+
}
2255+
2256+
// Z063
2257+
message ArtifactUploadSkipped {
2258+
string msg = 1;
2259+
}
2260+
2261+
message ArtifactUploadSkippedMsg {
2262+
CoreEventInfo info = 1;
2263+
ArtifactUploadSkipped data = 2;
2264+
}

core/dbt/events/core_types_pb2.py

Lines changed: 14 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/dbt/events/types.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2185,3 +2185,30 @@ def code(self) -> str:
21852185

21862186
def message(self) -> str:
21872187
return f"Resource report: {self.to_json()}"
2188+
2189+
2190+
# Artifact Upload Events #
2191+
2192+
2193+
class ArtifactUploadError(ErrorLevel):
2194+
def code(self) -> str:
2195+
return "Z061"
2196+
2197+
def message(self) -> str:
2198+
return f"Error uploading artifacts to artifact ingestion API: {self.msg}"
2199+
2200+
2201+
class ArtifactUploadSuccess(InfoLevel):
2202+
def code(self) -> str:
2203+
return "Z062"
2204+
2205+
def message(self) -> str:
2206+
return f"Artifacts uploaded successfully to artifact ingestion API: {self.msg}"
2207+
2208+
2209+
class ArtifactUploadSkipped(DebugLevel):
2210+
def code(self) -> str:
2211+
return "Z063"
2212+
2213+
def message(self) -> str:
2214+
return f"Artifacts skipped for command : {self.msg}"

0 commit comments

Comments
 (0)