Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified dag_example_module.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion hamilton/async_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
import typing
import uuid
from types import ModuleType
from types import FunctionType, ModuleType
from typing import Any, Dict, Optional, Tuple

import hamilton.lifecycle.base as lifecycle_base
Expand Down Expand Up @@ -199,6 +199,7 @@ def __init__(
result_builder: Optional[base.ResultMixin] = None,
adapters: typing.List[lifecycle.LifecycleAdapter] = None,
allow_module_overrides: bool = False,
functions: typing.List[FunctionType] = None,
):
"""Instantiates an asynchronous driver.

Expand Down Expand Up @@ -249,6 +250,7 @@ def __init__(
*async_adapters, # note async adapters will not be called during synchronous execution -- this is for access later
],
allow_module_overrides=allow_module_overrides,
functions=functions,
)
self.initialized = False

Expand Down
24 changes: 20 additions & 4 deletions hamilton/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import typing
import uuid
from datetime import datetime
from types import ModuleType
from types import FunctionType, ModuleType
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -402,6 +402,7 @@ def __init__(
self,
config: Dict[str, Any],
*modules: ModuleType,
functions: List[FunctionType] = None,
adapter: Optional[
Union[lifecycle_base.LifecycleAdapter, List[lifecycle_base.LifecycleAdapter]]
] = None,
Expand Down Expand Up @@ -435,13 +436,15 @@ def __init__(
if adapter.does_hook("pre_do_anything", is_async=False):
adapter.call_all_lifecycle_hooks_sync("pre_do_anything")
error = None
self.graph_functions = functions if functions is not None else []
self.graph_modules = modules
try:
self.graph = graph.FunctionGraph.from_modules(
*modules,
self.graph = graph.FunctionGraph.compile(
modules=list(modules),
functions=functions if functions is not None else [],
config=config,
adapter=adapter,
allow_module_overrides=allow_module_overrides,
allow_node_overrides=allow_module_overrides,
)
if _materializers:
materializer_factories, extractor_factories = self._process_materializers(
Expand Down Expand Up @@ -1866,6 +1869,7 @@ def __init__(self):
# common fields
self.config = {}
self.modules = []
self.functions = []
self.materializers = []

# Allow later modules to override nodes of the same name
Expand Down Expand Up @@ -1927,6 +1931,17 @@ def with_modules(self, *modules: ModuleType) -> "Builder":
self.modules.extend(modules)
return self

def with_functions(self, *functions: FunctionType) -> "Builder":
"""Adds the specified functions to the list.
This can be called multiple times. If you have allow_module_overrides
set this will enabl overwriting modules or previously added functions.

:param functions:
:return: self
"""
self.functions.extend(functions)
return self
Comment on lines +1934 to +1943
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want this? Why can't we just put it in the .with_modules?

E.g. this opens up issues of precedence when using both modules and functions.


def with_adapter(self, adapter: base.HamiltonGraphAdapter) -> "Builder":
"""Sets the adapter to use.

Expand Down Expand Up @@ -2168,6 +2183,7 @@ def build(self) -> Driver:
_graph_executor=graph_executor,
_use_legacy_adapter=False,
allow_module_overrides=self._allow_module_overrides,
functions=self.functions,
)

def copy(self) -> "Builder":
Expand Down
58 changes: 44 additions & 14 deletions hamilton/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import pathlib
import uuid
from enum import Enum
from types import ModuleType
from types import FunctionType, ModuleType
from typing import Any, Callable, Collection, Dict, FrozenSet, List, Optional, Set, Tuple, Type

import hamilton.lifecycle.base as lifecycle_base
Expand Down Expand Up @@ -142,17 +142,18 @@ def update_dependencies(
return nodes


def create_function_graph(
def compile_to_nodes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new parameter 'allow_node_level_overrides' and corresponding error message look good. Consider clarifying in the docstring that this option controls node‐level duplicate detection, as opposed to module overrides.

*functions: List[Tuple[str, Callable]],
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
fg: Optional["FunctionGraph"] = None,
allow_module_overrides: bool = False,
allow_node_level_overrides: bool = False,
) -> Dict[str, node.Node]:
"""Creates a graph of all available functions & their dependencies.
:param modules: A set of modules over which one wants to compute the function graph
:param config: Dictionary that we will inspect to get values from in building the function graph.
:param adapter: The adapter that adapts our node type checking based on the context.
:param allow_node_level_overrides: Whether or not to allow node names to override each other
:return: list of nodes in the graph.
If it needs to be more complicated, we'll return an actual networkx graph and get all the rest of the logic for free
"""
Expand All @@ -170,7 +171,7 @@ def create_function_graph(
for n in fm_base.resolve_nodes(f, config):
if n.name in config:
continue # This makes sure we overwrite things if they're in the config...
if n.name in nodes and not allow_module_overrides:
if n.name in nodes and not allow_node_level_overrides:
raise ValueError(
f"Cannot define function {n.name} more than once."
f" Already defined by function {f}"
Expand Down Expand Up @@ -713,13 +714,42 @@ def __init__(
self.nodes = nodes
self.adapter = adapter

@staticmethod
def compile(
modules: List[ModuleType],
functions: List[FunctionType],
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
allow_node_overrides: bool = False,
) -> "FunctionGraph":
"""Base level static function for compiling a function graph. Note
that this can both use functions (E.G. passing them directly) and modules
(passing them in and crawling.

:param modules: Modules to use
:param functions: Functions to use
:param config: Config to use for setting up the DAG
:param adapter: Adapter to use for node resolution
:param allow_node_overrides: Whether or not to allow node level overrides.
:return: The compiled function graph
"""
module_functions = sum([find_functions(module) for module in modules], [])
nodes = compile_to_nodes(
*module_functions,
*functions,
config=config,
adapter=adapter,
allow_node_level_overrides=allow_node_overrides,
)
return FunctionGraph(nodes, config, adapter)

@staticmethod
def from_modules(
*modules: ModuleType,
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
allow_module_overrides: bool = False,
):
) -> "FunctionGraph":
"""Initializes a function graph from the specified modules. Note that this was the old
way we constructed FunctionGraph -- this is not a public-facing API, so we replaced it
with a constructor that takes in nodes directly. If you hacked in something using
Expand All @@ -732,28 +762,28 @@ def from_modules(
:return: a function graph.
"""

functions = sum([find_functions(module) for module in modules], [])
return FunctionGraph.from_functions(
*functions,
return FunctionGraph.compile(
modules=modules,
functions=[],
config=config,
adapter=adapter,
allow_module_overrides=allow_module_overrides,
allow_node_overrides=allow_module_overrides,
)

@staticmethod
def from_functions(
*functions,
*functions: FunctionType,
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
allow_module_overrides: bool = False,
) -> "FunctionGraph":
nodes = create_function_graph(
*functions,
return FunctionGraph.compile(
modules=[],
functions=functions,
config=config,
adapter=adapter,
allow_module_overrides=allow_module_overrides,
allow_node_overrides=allow_module_overrides,
)
return FunctionGraph(nodes, config, adapter)

def with_nodes(self, nodes: Dict[str, Node]) -> "FunctionGraph":
"""Creates a new function graph with the additional specified nodes.
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ docs = [
"diskcache",
# required for all the plugins
"dlt",
# furo -- install from main for now until the next release is out:
"furo @ git+https://github.com/pradyunsg/furo@main",
"furo",
"gitpython", # Required for parsing git info for generation of data-adapter docs
"grpcio-status",
"lightgbm",
Expand Down