Drivers#

General#

Use this driver in a general python context. E.g. batch, jupyter notebook, etc.

class hamilton.driver.Driver(config: Dict[str, Any], *modules: module, adapter: HamiltonGraphAdapter = None)#

This class orchestrates creating and executing the DAG to create a dataframe.

from hamilton import driver
from hamilton import base

# 1. Setup config or invariant input.
config = {}

# 2. we need to tell hamilton where to load function definitions from
import my_functions
# or programmatically (e.g. you can script module loading)
module_name = 'my_functions'
my_functions = importlib.import_module(module_name)

# 3. Determine the return type -- default is a pandas.DataFrame.
adapter = base.SimplePythonDataFrameGraphAdapter() # See GraphAdapter docs for more details.

# These all feed into creating the driver & thus DAG.
dr = driver.Driver(config, module, adapter=adapter)
__init__(config: Dict[str, Any], *modules: module, adapter: HamiltonGraphAdapter = None)#

Constructor: creates a DAG given the configuration & modules to crawl.

Parameters:
  • config – This is a dictionary of initial data & configuration. The contents are used to help create the DAG.

  • modules – Python module objects you want to inspect for Hamilton Functions.

  • adapter – Optional. A way to wire in another way of “executing” a hamilton graph. Defaults to using original Hamilton adapter which is single threaded in memory python.

capture_constructor_telemetry(error: Optional[str], modules: Tuple[module], config: Dict[str, Any], adapter: HamiltonGraphAdapter)#

Captures constructor telemetry.

Notes: (1) we want to do this in a way that does not break. (2) we need to account for all possible states, e.g. someone passing in None, or assuming that the entire constructor code ran without issue, e.g. adpater was assigned to self.

Parameters:
  • error – the sanitized error string to send.

  • modules – the list of modules, could be None.

  • config – the config dict passed, could be None.

  • adapter – the adapter passed in, might not be attached to self yet.

capture_execute_telemetry(error: Optional[str], final_vars: List[str], inputs: Dict[str, Any], overrides: Dict[str, Any], run_successful: bool, duration: float)#

Captures telemetry after execute has run.

Notes: (1) we want to be quite defensive in not breaking anyone’s code with things we do here. (2) thus we want to double-check that values exist before doing something with them.

Parameters:
  • error – the sanitized error string to capture, if any.

  • final_vars – the list of final variables to get.

  • inputs – the inputs to the execute function.

  • overrides – any overrides to the execute function.

  • run_successful – whether this run was successful.

  • duration – time it took to run execute.

display_all_functions(output_file_path: str, render_kwargs: dict = None, graphviz_kwargs: dict = None) Optional[graphviz.Digraph]#

Displays the graph of all functions loaded!

Parameters:
  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph-all.dot’

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}. See https://graphviz.readthedocs.io/en/stable/api.html#graphviz.Graph.render for other options.

  • graphviz_kwargs – Optional. Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image. See https://graphviz.org/doc/info/attrs.html for options.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

display_downstream_of(*node_names: str, output_file_path: str, render_kwargs: dict, graphviz_kwargs: dict) Optional[graphviz.Digraph]#

Creates a visualization of the DAG starting from the passed in function name(s).

Note: for any “node” visualized, we will also add its parents to the visualization as well, so there could be more nodes visualized than strictly what is downstream of the passed in function name(s).

Parameters:
  • node_names – names of function(s) that are starting points for traversing the graph.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’. Pass in None to skip saving any file.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}.

  • graphviz_kwargs – Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

display_upstream_of(*node_names: str, output_file_path: str, render_kwargs: dict, graphviz_kwargs: dict) Optional[graphviz.Digraph]#

Creates a visualization of the DAG going backwards from the passed in function name(s).

Note: for any “node” visualized, we will also add its parents to the visualization as well, so there could be more nodes visualized than strictly what is downstream of the passed in function name(s).

Parameters:
  • node_names – names of function(s) that are starting points for traversing the graph.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’. Pass in None to skip saving any file.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}.

  • graphviz_kwargs – Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

execute(final_vars: List[Union[str, Callable, Variable]], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None) Any#

Executes computation.

Parameters:
  • final_vars – the final list of outputs we want to compute.

  • overrides – values that will override “nodes” in the DAG.

  • display_graph – DEPRECATED. Whether we want to display the graph being computed.

  • inputs – Runtime inputs to the DAG.

Returns:

an object consisting of the variables requested, matching the type returned by the GraphAdapter. See constructor for how the GraphAdapter is initialized. The default one right now returns a pandas dataframe.

has_cycles(final_vars: List[Union[str, Callable, Variable]]) bool#

Checks that the created graph does not have cycles.

Parameters:

final_vars – the outputs we want to compute.

Returns:

boolean True for cycles, False for no cycles.

list_available_variables() List[Variable]#

Returns available variables, i.e. outputs.

These variables corresond 1:1 with nodes in the DAG, and contain the following information: 1. name: the name of the node 2. tags: the tags associated with this node 3. type: The type of data this node returns 4. is_external_input: Whether this node represents an external input (required from outside), or not (has a function specifying its behavior).

Returns:

list of available variables (i.e. outputs).

raw_execute(final_vars: List[str], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None) Dict[str, Any]#

Raw execute function that does the meat of execute.

It does not try to stitch anything together. Thus allowing wrapper executes around this to shape the output of the data.

Parameters:
  • final_vars – Final variables to compute

  • overrides – Overrides to run.

  • display_graph – DEPRECATED. DO NOT USE. Whether or not to display the graph when running it

  • inputs – Runtime inputs to the DAG

Returns:

validate_inputs(user_nodes: Collection[Node], inputs: Optional[Dict[str, Any]] = None, nodes_set: Collection[Node] = None)#

Validates that inputs meet our expectations. This means that: 1. The runtime inputs don’t clash with the graph’s config 2. All expected graph inputs are provided, either in config or at runtime

Parameters:
  • user_nodes – The required nodes we need for computation.

  • inputs – the user inputs provided.

  • nodes_set – the set of nodes to use for validation; Optional.

visualize_execution(final_vars: List[Union[str, Callable, Variable]], output_file_path: str, render_kwargs: dict, inputs: Dict[str, Any] = None, graphviz_kwargs: dict = None) Optional[graphviz.Digraph]#

Visualizes Execution.

Note: overrides are not handled at this time.

Shapes:

  • ovals are nodes/functions

  • rectangles are nodes/functions that are requested as output

  • shapes with dotted lines are inputs required to run the DAG.

Parameters:
  • final_vars – the outputs we want to compute. They will become rectangles in the graph.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}. See https://graphviz.readthedocs.io/en/stable/api.html#graphviz.Graph.render for other options.

  • inputs – Optional. Runtime inputs to the DAG.

  • graphviz_kwargs – Optional. Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image. See https://graphviz.org/doc/info/attrs.html for options.

Returns:

the graphviz object if you want to do more with it. If returned as the result in a Jupyter Notebook cell, it will render.

visualize_path_between(upstream_node_name: str, downstream_node_name: str, output_file_path: Optional[str] = None, render_kwargs: dict = None, graphviz_kwargs: dict = None, strict_path_visualization: bool = False) Optional[graphviz.Digraph]#

Visualizes the path between two nodes.

This is useful for debugging and understanding the path between two nodes.

Parameters:
  • upstream_node_name – the name of the node that we want to start from.

  • downstream_node_name – the name of the node that we want to end at.

  • output_file_path – the full URI of path + file name to save the dot file to. E.g. ‘some/path/graph.dot’. Pass in None to skip saving any file.

  • render_kwargs – a dictionary of values we’ll pass to graphviz render function. Defaults to viewing. If you do not want to view the file, pass in {‘view’:False}.

  • graphviz_kwargs – Kwargs to be passed to the graphviz graph object to configure it. E.g. dict(graph_attr={‘ratio’: ‘1’}) will set the aspect ratio to be equal of the produced image.

  • strict_path_visualization – If True, only the nodes in the path will be visualized. If False, the nodes in the path and their dependencies, i.e. parents, will be visualized.

Returns:

graphviz object.

Raises:

ValueError – if the upstream or downstream node names are not found in the graph, or there is no path between them.

what_is_downstream_of(*node_names: str) List[Variable]#

Tells you what is downstream of this function(s), i.e. node(s).

Parameters:

node_names – names of function(s) that are starting points for traversing the graph.

Returns:

list of “variables” (i.e. nodes), inclusive of the function names, that are downstream of the passed in function names.

what_is_the_path_between(upstream_node_name: str, downstream_node_name: str) List[Variable]#

Tells you what nodes are on the path between two nodes.

Note: this is inclusive of the two nodes, and returns an unsorted list of nodes.

Parameters:
  • upstream_node_name – the name of the node that we want to start from.

  • downstream_node_name – the name of the node that we want to end at.

Returns:

Nodes representing the path between the two nodes, inclusive of the two nodes, unsorted. Returns empty list if no path exists.

Raises:

ValueError – if the upstream or downstream node name is not in the graph.

what_is_upstream_of(*node_names: str) List[Variable]#

Tells you what is upstream of this function(s), i.e. node(s).

Parameters:

node_names – names of function(s) that are starting points for traversing the graph backwards.

Returns:

list of “variables” (i.e. nodes), inclusive of the function names, that are upstream of the passed in function names.

Async Context#

Use this driver in an async context. E.g. for use with FastAPI.

class hamilton.experimental.h_async.AsyncDriver(config, *modules, result_builder: Optional[ResultMixin] = None)#

Async driver. This is a driver that uses the AsyncGraphAdapter to execute the graph.

dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
df = await dr.execute([...], inputs=...)
__init__(config, *modules, result_builder: Optional[ResultMixin] = None)#

Instantiates an asynchronous driver.

Parameters:
  • config – Config to build the graph

  • modules – Modules to crawl for fns/graph nodes

  • result_builder – Results mixin to compile the graph’s final results. TBD whether this should be included in the long run.

capture_constructor_telemetry(error: Optional[str], modules: Tuple[module], config: Dict[str, Any], adapter: HamiltonGraphAdapter)#

Ensures we capture constructor telemetry the right way in an async context.

This is a simpler wrapper around what’s in the driver class.

Parameters:
  • error – sanitized error string, if any.

  • modules – tuple of modules to build DAG from.

  • config – config to create the driver.

  • adapter – adapter class object.

async execute(final_vars: List[str], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None) Any#

Executes computation.

Parameters:
  • final_vars – the final list of variables we want to compute.

  • overrides – values that will override “nodes” in the DAG.

  • display_graph – DEPRECATED. Whether we want to display the graph being computed.

  • inputs – Runtime inputs to the DAG.

Returns:

an object consisting of the variables requested, matching the type returned by the GraphAdapter. See constructor for how the GraphAdapter is initialized. The default one right now returns a pandas dataframe.

async raw_execute(final_vars: List[str], overrides: Dict[str, Any] = None, display_graph: bool = False, inputs: Dict[str, Any] = None) Dict[str, Any]#

Executes the graph, returning a dictionary of strings (node keys) to final results.

Parameters:
  • final_vars – Variables to execute (+ upstream)

  • overrides – Overrides for nodes

  • display_graph – whether or not to display graph – this is not supported.

  • inputs – Inputs for DAG runtime calculation

Returns:

A dict of key -> result