Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
0ef5e33
first checkpoint
bcdurak Oct 24, 2025
f7fe096
migration
bcdurak Oct 24, 2025
e5ef506
second check
bcdurak Oct 29, 2025
a2a88b3
docs checkpoint
bcdurak Oct 29, 2025
d7faedf
another checkpoint
bcdurak Oct 29, 2025
96bf1f8
new checkpoint
bcdurak Oct 30, 2025
d634475
solved conflict
bcdurak Oct 30, 2025
185d279
solving things
bcdurak Oct 30, 2025
f4a62fb
some other checkpoint
bcdurak Oct 30, 2025
c189af8
formatting
bcdurak Oct 30, 2025
929bd3a
new changes
bcdurak Oct 30, 2025
ee9973a
checkpoint
bcdurak Oct 30, 2025
1f95b4f
fixing the secret
bcdurak Oct 31, 2025
4127bbd
some broken checkpoint
bcdurak Nov 6, 2025
c733401
new big checkpoint
bcdurak Nov 9, 2025
3467617
merge conflicts resolved
bcdurak Nov 9, 2025
443d92e
removing docs for now
bcdurak Nov 9, 2025
cc39e34
some fixes
bcdurak Nov 9, 2025
fddf02b
fix the local orch
bcdurak Nov 9, 2025
9f50c73
removed unused
bcdurak Nov 11, 2025
50d6833
delete the old step logging
bcdurak Nov 11, 2025
4866c89
some fixes
bcdurak Nov 11, 2025
01da27e
running checkpoint
bcdurak Nov 17, 2025
6f37cb8
Merge branch 'develop' into feature/log-store
bcdurak Nov 17, 2025
d4243af
new changes
bcdurak Nov 18, 2025
07eb569
stack changes
bcdurak Nov 18, 2025
9e49f3b
more fixes
bcdurak Nov 18, 2025
997c23a
new changes
bcdurak Nov 18, 2025
61802c7
new stuff
bcdurak Nov 18, 2025
d223473
new defaults and formatting
bcdurak Nov 18, 2025
6798bf8
moving stuff around
bcdurak Nov 19, 2025
02a724a
adding the dependency
bcdurak Nov 19, 2025
07ecc6c
Merge branch 'develop' into feature/log-store
bcdurak Nov 19, 2025
ddd54bb
fixing the migration
bcdurak Nov 20, 2025
854e96a
new changes
bcdurak Nov 20, 2025
33ea2f2
formating
bcdurak Nov 20, 2025
0a64385
new creation
bcdurak Nov 20, 2025
6a2d9d6
some new import structure
bcdurak Nov 20, 2025
dba0a01
fixing the order
bcdurak Nov 20, 2025
017253a
some minor changes
bcdurak Nov 20, 2025
a783b62
more minor changes
bcdurak Nov 20, 2025
0e71b77
new default constants
bcdurak Nov 20, 2025
2996978
some more minor fixes
bcdurak Nov 20, 2025
36c9752
fix
bcdurak Nov 20, 2025
174a628
Merge branch 'develop' into feature/log-store
bcdurak Nov 21, 2025
440e2e4
some fixes
bcdurak Nov 21, 2025
c9cd728
dev
bcdurak Nov 21, 2025
b1f1199
new try
bcdurak Nov 21, 2025
d338818
Fix infinite loop on debug logs
stefannica Nov 24, 2025
14fdc00
Log exceptions raised during the logger context emit calls
stefannica Nov 24, 2025
c5c377a
Decoupled logging context from the log store and added flush method t…
stefannica Nov 24, 2025
05ac55c
resolve conflicts
bcdurak Nov 25, 2025
9dac534
optimizing
bcdurak Nov 27, 2025
e980104
conflicts resolved
bcdurak Nov 27, 2025
3312473
some changes
bcdurak Nov 30, 2025
56eefb8
Intermediate fixes for scalability
stefannica Dec 1, 2025
8c249dd
Update logging for deployers to use utils
stefannica Dec 1, 2025
a299e4d
Refactored the EOF operation
stefannica Dec 1, 2025
0e10659
Fix first round of bugs after last changes
stefannica Dec 1, 2025
f9da30f
resolving conflicts
bcdurak Dec 2, 2025
49d8eac
formatting
bcdurak Dec 2, 2025
7aab5d5
formatting, linting, docstrings and tests
bcdurak Dec 2, 2025
c2ca752
unit tests
bcdurak Dec 2, 2025
a3829b5
docstrings
bcdurak Dec 2, 2025
b24b7bf
removed old tests
bcdurak Dec 2, 2025
aeee56f
merged develop
bcdurak Dec 2, 2025
635e3b3
format
bcdurak Dec 2, 2025
b8ab51f
Apply code review suggestions
stefannica Dec 2, 2025
b86f425
Another round of code review suggestions
stefannica Dec 2, 2025
dd9e23d
Add metadata to logs and replaced the datadog exporter with the stand…
stefannica Dec 3, 2025
255224f
Fixed datadog log fetching
stefannica Dec 3, 2025
6fee1eb
docstrings
bcdurak Dec 3, 2025
776e8d5
merged develop
bcdurak Dec 3, 2025
04d18d0
Update src/zenml/log_stores/artifact/artifact_log_exporter.py
stefannica Dec 3, 2025
0378082
Removed context, fixed datadog fetch time window, used OTEL handler
stefannica Dec 3, 2025
523a620
Implement generic OTEL exporter
stefannica Dec 4, 2025
8857797
Fix docstrings and spelling errors
stefannica Dec 4, 2025
3d59d00
Fix linter errors
stefannica Dec 4, 2025
1a39e17
merged develop
bcdurak Dec 4, 2025
55563b3
fixing the unit tests
bcdurak Dec 4, 2025
14c2f00
format
bcdurak Dec 4, 2025
7bbac46
Improved otel exporter to use correct fields
stefannica Dec 4, 2025
183f88c
Merge branch 'feature/log-store' of github.com:zenml-io/zenml into fe…
stefannica Dec 4, 2025
fd0368d
small fix to the runner
bcdurak Dec 4, 2025
1d0c7bf
one more
bcdurak Dec 4, 2025
526eb31
removed todo
bcdurak Dec 4, 2025
5b84a26
more minor fixes
bcdurak Dec 4, 2025
a41f4e8
sql zen store changes
bcdurak Dec 4, 2025
3725797
more minor fixes
bcdurak Dec 4, 2025
8c89ef1
another small fix
bcdurak Dec 4, 2025
3562ade
minor fixes
bcdurak Dec 5, 2025
11def57
better log entry fetching
bcdurak Dec 5, 2025
74fea2e
merged develop
bcdurak Dec 5, 2025
8dfcedf
late night changes
bcdurak Dec 6, 2025
9ef088d
proper limits
bcdurak Dec 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"docker~=7.1.0",
"gitpython>=3.1.18,<4.0.0",
"jsonref",
"opentelemetry-sdk==1.38.0",
"packaging>=24.1",
"psutil>=5.0.0",
"pydantic>=2.0,<=2.11.9",
Expand Down
4 changes: 2 additions & 2 deletions src/zenml/artifacts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ def load_artifact_visualization(
f"Artifact '{artifact.id}' cannot be visualized because the "
"underlying artifact store was deleted."
)
artifact_store = _load_artifact_store(
artifact_store = load_artifact_store(
artifact_store_id=artifact.artifact_store_id, zen_store=zen_store
)
try:
Expand Down Expand Up @@ -821,7 +821,7 @@ def _load_artifact_from_uri(
return artifact


def _load_artifact_store(
def load_artifact_store(
artifact_store_id: Union[str, "UUID"],
zen_store: Optional["BaseZenStore"] = None,
) -> "BaseArtifactStore":
Expand Down
11 changes: 11 additions & 0 deletions src/zenml/cli/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ def stack() -> None:
type=str,
required=False,
)
@click.option(
"-l",
"--log_store",
"log_store",
help="Name of the log store for this stack.",
type=str,
required=False,
)
@click.option(
"--set",
"set_stack",
Expand Down Expand Up @@ -255,6 +263,7 @@ def register_stack(
data_validator: Optional[str] = None,
image_builder: Optional[str] = None,
deployer: Optional[str] = None,
log_store: Optional[str] = None,
set_stack: bool = False,
provider: Optional[str] = None,
connector: Optional[str] = None,
Expand All @@ -278,6 +287,7 @@ def register_stack(
data_validator: Name of the data validator for this stack.
image_builder: Name of the new image builder for this stack.
deployer: Name of the deployer for this stack.
log_store: Name of the log store for this stack.
set_stack: Immediately set this stack as active.
provider: Name of the cloud provider for this stack.
connector: Name of the service connector for this stack.
Expand Down Expand Up @@ -522,6 +532,7 @@ def register_stack(
(StackComponentType.DATA_VALIDATOR, data_validator),
(StackComponentType.FEATURE_STORE, feature_store),
(StackComponentType.IMAGE_BUILDER, image_builder),
(StackComponentType.LOG_STORE, log_store),
(StackComponentType.MODEL_DEPLOYER, model_deployer),
(StackComponentType.MODEL_REGISTRY, model_registry),
(StackComponentType.STEP_OPERATOR, step_operator),
Expand Down
33 changes: 29 additions & 4 deletions src/zenml/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,15 @@ def handle_int_env_var(var: str, default: int = 0) -> int:
ENV_ZENML_LOGGING_VERBOSITY, default="DEBUG"
).upper()
ZENML_STORAGE_LOGGING_VERBOSITY = os.getenv(
ENV_ZENML_STORAGE_LOGGING_VERBOSITY, default="DEBUG"
).upper()
ENV_ZENML_STORAGE_LOGGING_VERBOSITY, default=None
)
else:
ZENML_LOGGING_VERBOSITY = os.getenv(
ENV_ZENML_LOGGING_VERBOSITY, default="INFO"
).upper()
ZENML_STORAGE_LOGGING_VERBOSITY = os.getenv(
ENV_ZENML_STORAGE_LOGGING_VERBOSITY, default="INFO"
).upper()
ENV_ZENML_STORAGE_LOGGING_VERBOSITY, default=None
)

INSIDE_ZENML_CONTAINER = handle_bool_env_var(ENV_ZENML_CONTAINER, False)

Expand Down Expand Up @@ -547,3 +547,28 @@ def handle_int_env_var(var: str, default: int = 0) -> int:
LOGS_MERGE_INTERVAL_SECONDS = handle_int_env_var(
ENV_ZENML_LOGS_MERGE_INTERVAL_SECONDS, default=10 * 60
)

# OpenTelemetry log storage constants
ENV_ZENML_LOGS_OTEL_MAX_QUEUE_SIZE = "ZENML_LOGS_OTEL_MAX_QUEUE_SIZE"
ENV_ZENML_LOGS_OTEL_SCHEDULE_DELAY_MILLIS = (
"ZENML_LOGS_OTEL_SCHEDULE_DELAY_MILLIS"
)
ENV_ZENML_LOGS_OTEL_MAX_EXPORT_BATCH_SIZE = (
"ZENML_LOGS_OTEL_MAX_EXPORT_BATCH_SIZE"
)
ENV_ZENML_LOGS_OTEL_EXPORT_TIMEOUT_MILLIS = (
"ZENML_LOGS_OTEL_EXPORT_TIMEOUT_MILLIS"
)

LOGS_OTEL_MAX_QUEUE_SIZE = handle_int_env_var(
ENV_ZENML_LOGS_OTEL_MAX_QUEUE_SIZE, default=100000
)
LOGS_OTEL_SCHEDULE_DELAY_MILLIS = handle_int_env_var(
ENV_ZENML_LOGS_OTEL_SCHEDULE_DELAY_MILLIS, default=5000
)
LOGS_OTEL_MAX_EXPORT_BATCH_SIZE = handle_int_env_var(
ENV_ZENML_LOGS_OTEL_MAX_EXPORT_BATCH_SIZE, default=5000
)
LOGS_OTEL_EXPORT_TIMEOUT_MILLIS = handle_int_env_var(
ENV_ZENML_LOGS_OTEL_EXPORT_TIMEOUT_MILLIS, default=15000
)
47 changes: 27 additions & 20 deletions src/zenml/deployers/server/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from zenml.stack import Stack
from zenml.steps.utils import get_unique_step_output_names
from zenml.utils import env_utils, source_utils
from zenml.utils.logging_utils import setup_run_logging
from zenml.zen_stores.rest_zen_store import RestZenStore

if TYPE_CHECKING:
Expand Down Expand Up @@ -313,6 +314,9 @@ def initialize(self) -> None:
)
self._client.zen_store.reinitialize_session()

# Instantiate the active stack here to avoid race conditions later
self._client.active_stack.validate()

# Execution tracking
self.service_start_time = time.time()
self.last_execution_time: Optional[datetime] = None
Expand Down Expand Up @@ -382,7 +386,7 @@ def execute_pipeline(
)

except Exception as e:
logger.error(f"❌ Pipeline execution failed: {e}")
logger.exception("❌ Pipeline execution failed")
return self._build_response(
placeholder_run=placeholder_run,
mapped_outputs=None,
Expand Down Expand Up @@ -516,16 +520,13 @@ def _prepare_execute_with_orchestrator(
deployment_snapshot = self._client.zen_store.create_snapshot(
deployment_snapshot_request
)

# Create a placeholder run using the new deployment snapshot
placeholder_run = run_utils.create_placeholder_run(
snapshot=deployment_snapshot,
logs=None,
trigger_info=PipelineRunTriggerInfo(
deployment_id=self.deployment.id,
),
)

return placeholder_run, deployment_snapshot

def _execute_with_orchestrator(
Expand Down Expand Up @@ -576,23 +577,29 @@ def _execute_with_orchestrator(
)

captured_outputs: Optional[Dict[str, Dict[str, Any]]] = None
try:
# Use the new deployment snapshot with pre-configured settings
orchestrator.run(
snapshot=deployment_snapshot,
stack=active_stack,
placeholder_run=placeholder_run,
)
logging_context = setup_run_logging(
pipeline_run=placeholder_run,
source="deployment",
)

# Capture in-memory outputs before stopping the runtime context
if runtime.is_active():
captured_outputs = runtime.get_outputs()
except Exception as e:
logger.exception(f"Failed to execute pipeline: {e}")
raise RuntimeError(f"Failed to execute pipeline: {e}")
finally:
# Always stop deployment runtime context
runtime.stop()
with logging_context:
try:
# Use the new deployment snapshot with pre-configured settings
orchestrator.run(
snapshot=deployment_snapshot,
stack=active_stack,
placeholder_run=placeholder_run,
)

# Capture in-memory outputs before stopping the runtime context
if runtime.is_active():
captured_outputs = runtime.get_outputs()
except Exception as e:
logger.exception(f"Failed to execute pipeline: {e}")
raise RuntimeError(f"Failed to execute pipeline: {e}")
finally:
# Always stop deployment runtime context
runtime.stop()

return captured_outputs

Expand Down
1 change: 1 addition & 0 deletions src/zenml/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class StackComponentType(StrEnum):
EXPERIMENT_TRACKER = "experiment_tracker"
FEATURE_STORE = "feature_store"
IMAGE_BUILDER = "image_builder"
LOG_STORE = "log_store"
MODEL_DEPLOYER = "model_deployer"
ORCHESTRATOR = "orchestrator"
STEP_OPERATOR = "step_operator"
Expand Down
43 changes: 24 additions & 19 deletions src/zenml/execution/pipeline/dynamic/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import inspect
import itertools
from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -53,7 +54,6 @@
from zenml.execution.pipeline.dynamic.utils import _Unmapped
from zenml.execution.step.utils import launch_step
from zenml.logger import get_logger
from zenml.logging.step_logging import setup_pipeline_logging
from zenml.models import (
ArtifactVersionResponse,
PipelineRunResponse,
Expand All @@ -70,6 +70,10 @@
from zenml.steps.entrypoint_function_utils import StepArtifact
from zenml.steps.utils import OutputSignature
from zenml.utils import source_utils
from zenml.utils.logging_utils import (
is_pipeline_logging_enabled,
setup_run_logging,
)

if TYPE_CHECKING:
from zenml.config import DockerSettings
Expand Down Expand Up @@ -153,25 +157,27 @@ def pipeline(self) -> "DynamicPipeline":

def run_pipeline(self) -> None:
"""Run the pipeline."""
with setup_pipeline_logging(
source="orchestrator",
snapshot=self._snapshot,
) as logs_request:
if self._run:
run = Client().zen_store.update_run(
run_id=self._run.id,
run_update=PipelineRunUpdate(
orchestrator_run_id=self._orchestrator_run_id,
add_logs=[logs_request] if logs_request else None,
),
)
else:
run = create_placeholder_run(
snapshot=self._snapshot,
if self._run:
run = Client().zen_store.update_run(
run_id=self._run.id,
run_update=PipelineRunUpdate(
orchestrator_run_id=self._orchestrator_run_id,
logs=logs_request,
)
),
)
else:
run = create_placeholder_run(
snapshot=self._snapshot,
orchestrator_run_id=self._orchestrator_run_id,
)

logging_context = nullcontext()
if is_pipeline_logging_enabled(self._snapshot.pipeline_configuration):
logging_context = setup_run_logging(
pipeline_run=run,
source="orchestrator",
)

with logging_context:
with InMemoryArtifactCache():
with DynamicPipelineRunContext(
pipeline=self.pipeline,
Expand All @@ -181,7 +187,6 @@ def run_pipeline(self) -> None:
):
self._orchestrator.run_init_hook(snapshot=self._snapshot)
try:
# TODO: step logging isn't threadsafe
# TODO: what should be allowed as pipeline returns?
# (artifacts, json serializable, anything?)
# how do we show it in the UI?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import socket
import threading
import time
from contextlib import nullcontext
from typing import List, Optional, Tuple, cast
from uuid import UUID

Expand Down Expand Up @@ -56,7 +57,6 @@
KubernetesOrchestrator,
)
from zenml.logger import get_logger
from zenml.logging.step_logging import setup_orchestrator_logging
from zenml.models import (
PipelineRunResponse,
PipelineRunUpdate,
Expand All @@ -74,6 +74,10 @@
)
from zenml.pipelines.run_utils import create_placeholder_run
from zenml.utils import env_utils
from zenml.utils.logging_utils import (
is_pipeline_logging_enabled,
setup_run_logging,
)

logger = get_logger(__name__)

Expand Down Expand Up @@ -244,7 +248,6 @@ def main() -> None:
namespace=namespace,
job_name=job_name,
)
existing_logs_response = None

if run_id and orchestrator_run_id:
logger.info("Continuing existing run `%s`.", run_id)
Expand All @@ -257,13 +260,9 @@ def main() -> None:
)
logger.debug("Reconstructed nodes: %s", nodes)

# Continue logging to the same log file if it exists
for log_response in pipeline_run.log_collection or []:
if log_response.source == "orchestrator":
existing_logs_response = log_response
break
else:
orchestrator_run_id = orchestrator_pod_name

if args.run_id:
pipeline_run = client.zen_store.update_run(
run_id=args.run_id,
Expand Down Expand Up @@ -293,11 +292,12 @@ def main() -> None:
for step_name, step in snapshot.step_configurations.items()
]

logs_context = setup_orchestrator_logging(
run_id=pipeline_run.id,
snapshot=snapshot,
logs_response=existing_logs_response,
)
logs_context = nullcontext()
if is_pipeline_logging_enabled(snapshot.pipeline_configuration):
logs_context = setup_run_logging(
pipeline_run=pipeline_run,
source="orchestrator",
)

with logs_context:
step_command = StepEntrypointConfiguration.get_entrypoint_command()
Expand Down
Loading
Loading