h_dask.DaskGraphAdapter¶

Runs the entire Hamilton DAG on dask.

class hamilton.plugins.h_dask.DaskGraphAdapter(dask_client: Client, result_builder: ResultMixin = None, visualize_kwargs: dict = None, use_delayed: bool = True, compute_at_end: bool = True)¶

Class representing what’s required to make Hamilton run on Dask.

This walks the graph and translates it to run onto Dask.

Use pip install sf-hamilton[dask] to get the dependencies required to run this.

Try this adapter when:

  1. Dask is a good choice to scale computation when you really can’t do things in memory anymore with pandas. For most simple pandas operations, you should not have to do anything to scale! You just need to load in data via dask rather than pandas.

  2. Dask can help scale to larger data sets if running on a cluster – you’ll just have to switch to natively using their object types if that’s the case (set use_delayed=False, and compute_at_end=False).

  3. Use this adapter if you want to utilize multiple cores on a single machine, or you want to scale to large data set sizes with a Dask cluster that you can connect to.

  4. The ONLY CAVEAT really is whether you use delayed or dask datatypes (or both).

Please read the following notes about its limitations.

Notes on scaling:¶

  • Multi-core on single machine ✅

  • Distributed computation on a Dask cluster ✅

  • Scales to any size of data supported by Dask ✅; assuming you load it appropriately via Dask loaders.

  • Works best with Pandas 2.0+ and pyarrow backend.

Function return object types supported:¶

  • Works for any python object that can be serialized by the Dask framework. ✅

Pandas?¶

Dask implements a good subset of the Pandas API:
  • You might be able to get away with scaling without having to change your code at all!

  • See https://docs.dask.org/en/latest/dataframe-api.html for Pandas supported APIs.

  • If it is not supported by their API, you have to then read up and think about how to structure you hamilton function computation – https://docs.dask.org/en/latest/dataframe.html

  • if paired with DaskDataFrameResult & use_delayed=False & compute_at_end=False, it will help you produce a dask dataframe as a result that you can then convert back to pandas if you want.

Loading Data:¶

CAVEATS with use_delayed=True:¶

  • If using use_delayed=True serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

  • With this adapter & use_delayed=True, it can naively wrap all your functions with delayed, which will mean they will be executed and scheduled across the dask workers. This is a good choice if your computation is slow, or Hamilton graph is highly parallelizable.

DISCLAIMER – this class is experimental, so signature changes are a possibility! But we’ll aim to be backwards compatible where possible.

__init__(dask_client: Client, result_builder: ResultMixin = None, visualize_kwargs: dict = None, use_delayed: bool = True, compute_at_end: bool = True)¶

Constructor

You have the ability to pass in a ResultMixin object to the constructor to control the return type that gets produced by running on Dask.

Parameters:
  • dask_client – the dask client – we don’t do anything with it, but thought that it would be useful to wire through here.

  • result_builder – The function that will build the result. Optional, defaults to pandas dataframe.

  • visualize_kwargs – Arguments to visualize the graph using dask’s internals. None, means no visualization. Dict, means visualize – see https://docs.dask.org/en/latest/api.html?highlight=visualize#dask.visualize for what to pass in.

  • use_delayed – Default is True for backwards compatibility. Whether to use dask.delayed to wrap every function. Note: it is probably not necessary to mix this with using dask objects, e.g. dataframes/series. They are by nature lazily computed and operate over the dask data types, so you don’t need to wrap them with delayed. Use delayed if you want to farm out computation.

  • compute_at_end – Default is True for backwards compatibility. Whether to compute() at the end. That is, should .compute() be called in the result builder to quick off computation.

build_result(**outputs: Dict[str, Any]) Any¶

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[delayed object reference | value]

Returns:

The type of object returned by self.result_builder. Note the following behaviors: - if you use_delayed=True, then the result will be a delayed object. - if you use_delayed=True & computed_at_end=True, then the result will be the return type of self.result_builder. - if you use_delayed=False & computed_at_end=True, this will only work if the self.result_builder returns a dask type, as we will try to compute it. - if you use_delayed=False & computed_at_end=False, this will return the result of self.result_builder.

static check_input_type(node_type: Type, input_value: Any) bool¶

Used to check whether the user inputs match what the execution strategy & functions can handle.

Static purely for legacy reasons.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

True if the input is valid, False otherwise.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool¶

Used to check whether two types are equivalent.

Static, purely for legacy reasons.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

True if the types are equivalent, False otherwise.

do_build_result(outputs: Dict[str, Any]) Any¶

Implements the do_build_result method from the BaseDoBuildResult class. This is kept from the user as the public-facing API is build_result, allowing us to change the API/implementation of the internal set of hooks

do_check_edge_types_match(type_from: type, type_to: type) bool¶

Method that checks whether two types are equivalent. This is used when the function graph is being created.

Parameters:
  • type_from – The type of the node that is the source of the edge.

  • type_to – The type of the node that is the destination of the edge.

Return bool:

Whether or not they are equivalent

do_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None) Any¶

Method that is called to implement node execution. This can replace the execution of a node with something all together, augment it, or delegate it.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • node – Node that is being executed

  • kwargs – Keyword arguments that are being passed into the node

  • task_id – ID of the task, defaults to None if not in a task setting

do_validate_input(node_type: type, input_value: Any) bool¶

Method that an input value maches an expected type.

Parameters:
  • node_type – The type of the node.

  • input_value – The value that we want to validate.

Returns:

Whether or not the input value matches the expected type.

execute_node(node: Node, kwargs: Dict[str, Any]) Any¶

Function that is called as we walk the graph to determine how to execute a hamilton function.

Parameters:
  • node – the node from the graph.

  • kwargs – the arguments that should be passed to it.

Returns:

returns a dask delayed object.

input_types() List[Type[Type]]¶

Gives the applicable types to this result builder. This is optional for backwards compatibility, but is recommended.

Returns:

A list of types that this can apply to.

output_type() Type¶

Returns the output type of this result builder :return: the type that this creates