Graph Adapters#

Graph adapters control how functions are executed as the graph is walked.

class hamilton.base.HamiltonGraphAdapter#

Any GraphAdapters should implement this interface to adapt the HamiltonGraph for that particular context.

Note since it inherits ResultMixin – HamiltonGraphAdapters need a build_result function too.

abstract static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

abstract static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

abstract execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Given a node that represents a hamilton function, execute it. Note, in some adapters this might just return some type of “future”.

Parameters:
  • node – the Hamilton Node

  • kwargs – the kwargs required to exercise the node function.

Returns:

the result of exercising the node.

class hamilton.base.SimplePythonDataFrameGraphAdapter#

This is the default (original Hamilton) graph adapter. It uses plain python and builds a dataframe result.

This executes the Hamilton dataflow locally on a machine in a single threaded, single process fashion. It assumes a pandas dataframe as a result.

Use this when you want to execute on a single machine, without parallelization, and you want a pandas dataframe as output.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Given a node that represents a hamilton function, execute it. Note, in some adapters this might just return some type of “future”.

Parameters:
  • node – the Hamilton Node

  • kwargs – the kwargs required to exercise the node function.

Returns:

the result of exercising the node.

class hamilton.base.SimplePythonGraphAdapter(result_builder: ResultMixin)#

This class allows you to swap out the build_result very easily.

This executes the Hamilton dataflow locally on a machine in a single threaded, single process fashion. It allows you to specify a ResultBuilder to control the return type of what execute() returns.

__init__(result_builder: ResultMixin)#

Allows you to swap out the build_result very easily.

Parameters:

result_builder – A ResultMixin object that will be used to build the result.

static build_dataframe_with_dataframes(outputs: Dict[str, Any]) DataFrame#

Builds a dataframe from the outputs in an “outer join” manner based on index.

The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in. To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are redefined in a rolling fashion going in order of the outputs requested. This then results in an “enlarged” outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.

Parameters:

outputs – The outputs to build the dataframe from.

Returns:

A dataframe with the outputs.

build_result(**outputs: Dict[str, Any]) Any#

Delegates to the result builder function supplied.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

static check_pandas_index_types_match(all_index_types: Dict[str, List[str]], time_indexes: Dict[str, List[str]], no_indexes: Dict[str, List[str]]) bool#

Checks that pandas index types match.

This only logs warning errors, and if debug is enabled, a debug statement to list index types.

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Given a node that represents a hamilton function, execute it. Note, in some adapters this might just return some type of “future”.

Parameters:
  • node – the Hamilton Node

  • kwargs – the kwargs required to exercise the node function.

Returns:

the result of exercising the node.

static pandas_index_types(outputs: Dict[str, Any]) Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]]#

This function creates three dictionaries according to whether there is an index type or not.

The three dicts we create are: 1. Dict of index type to list of outputs that match it. 2. Dict of time series / categorical index types to list of outputs that match it. 3. Dict of no-index key to list of outputs with no index type.

Parameters:

outputs – the dict we’re trying to create a result from.

Returns:

dict of all index types, dict of time series/categorical index types, dict if there is no index

Experimental Graph Adapters#

The following are considered experimental; there is a possibility of their API changing. That said, the code is stable, and you should feel comfortable giving the code for a spin - let us know how it goes, and what the rough edges are if you find any. We’d love feedback if you are using these to know how to improve them or graduate them.

Ray#

class hamilton.experimental.h_ray.RayGraphAdapter(result_builder: ResultMixin)#

Class representing what’s required to make Hamilton run on Ray.

This walks the graph and translates it to run onto Ray.

Use pip install sf-hamilton[ray] to get the dependencies required to run this.

Use this if:

  • you want to utilize multiple cores on a single machine, or you want to scale to larger data set sizes with a Ray cluster that you can connect to. Note (1): you are still constrained by machine memory size with Ray; you can’t just scale to any dataset size. Note (2): serialization costs can outweigh the benefits of parallelism so you should benchmark your code to see if it’s worth it.

Notes on scaling:#

  • Multi-core on single machine âś…

  • Distributed computation on a Ray cluster âś…

  • Scales to any size of data ⛔️; you are LIMITED by the memory on the instance/computer đź’».

Function return object types supported:#

  • Works for any python object that can be serialized by the Ray framework. âś…

Pandas?#

  • ⛔️ Ray DOES NOT do anything special about Pandas.

CAVEATS#

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(result_builder: ResultMixin)#

Constructor

You have the ability to pass in a ResultMixin object to the constructor to control the return type that gets produce by running on Ray.

Parameters:

result_builder – Required. An implementation of base.ResultMixin.

build_result(**outputs: Dict[str, Any]) Any#

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[ray object reference | value]

Returns:

The type of object returned by self.result_builder.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Function that is called as we walk the graph to determine how to execute a hamilton function.

Parameters:
  • node – the node from the graph.

  • kwargs – the arguments that should be passed to it.

Returns:

returns a ray object reference.

class hamilton.experimental.h_ray.RayWorkflowGraphAdapter(result_builder: ResultMixin, workflow_id: str)#

Class representing what’s required to make Hamilton run Ray Workflows

Use pip install sf-hamilton[ray] to get the dependencies required to run this.

Ray workflows is a more robust way to scale computation for any type of Hamilton graph.

What’s the difference between this and RayGraphAdapter?#

  • Ray workflows offer durable computation. That is, they save and checkpoint each function.

  • This enables one to run a workflow, and not have to restart it if something fails, assuming correct Ray workflow usage.

Tips#

See https://docs.ray.io/en/latest/workflows/basics.html for the source of the following:

  1. Functions should be idempotent.

  2. The workflow ID is what Ray uses to try to resume/restart if run a second time.

  3. Nothing is run until the entire DAG is walked and setup and build_result is called.

Notes on scaling:#

  • Multi-core on single machine âś…

  • Distributed computation on a Ray cluster âś…

  • Scales to any size of data ⛔️; you are LIMITED by the memory on the instance/computer đź’».

Function return object types supported:#

  • Works for any python object that can be serialized by the Ray framework. âś…

Pandas?#

  • ⛔️ Ray DOES NOT do anything special about Pandas.

CAVEATS#

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(result_builder: ResultMixin, workflow_id: str)#

Constructor

Parameters:
  • result_builder – Required. An implementation of base.ResultMixin.

  • workflow_id – Required. An ID to give the ray workflow to identify it for durability purposes.

  • max_retries – Optional. The function will be retried for the given number of times if an exception is raised.

build_result(**outputs: Dict[str, Any]) Any#

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[ray object reference | value]

Returns:

The type of object returned by self.result_builder.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Given a node that represents a hamilton function, execute it. Note, in some adapters this might just return some type of “future”.

Parameters:
  • node – the Hamilton Node

  • kwargs – the kwargs required to exercise the node function.

Returns:

the result of exercising the node.

Dask#

class hamilton.experimental.h_dask.DaskGraphAdapter(dask_client: Client, result_builder: ResultMixin = None, visualize_kwargs: dict = None)#

Class representing what’s required to make Hamilton run on Dask.

This walks the graph and translates it to run onto Dask.

Use pip install sf-hamilton[dask] to get the dependencies required to run this.

Try this adapter when:

  1. Dask is a good choice to scale computation when you really can’t do things in memory anymore with pandas. For most simple pandas operations, you should not have to do anything to scale!

  2. Dask is also a good choice if you want to scale computation generally – you’ll just have to switch to natively using their object types if that’s the case.

  3. Use this if you want to utilize multiple cores on a single machine, or you want to scale to large data set sizes with a Dask cluster that you can connect to.

Please read the following notes about its limitations.

Notes on scaling:#

  • Multi-core on single machine âś…

  • Distributed computation on a Dask cluster âś…

  • Scales to any size of data supported by Dask âś…; assuming you load it appropriately via Dask loaders.

Function return object types supported:#

  • Works for any python object that can be serialized by the Dask framework. âś…

Pandas?#

Dask implements a good subset of the Pandas API:

Loading Data:#

CAVEATS#

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(dask_client: Client, result_builder: ResultMixin = None, visualize_kwargs: dict = None)#

Constructor

You have the ability to pass in a ResultMixin object to the constructor to control the return type that gets produced by running on Dask.

Parameters:
  • dask_client – the dask client – we don’t do anything with it, but thought that it would be useful to wire through here.

  • result_builder – The function that will build the result. Optional, defaults to pandas dataframe.

  • visualize_kwargs – Arguments to visualize the graph using dask’s internals. None, means no visualization. Dict, means visualize – see https://docs.dask.org/en/latest/api.html?highlight=visualize#dask.visualize for what to pass in.

build_result(**outputs: Dict[str, Any]) Any#

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[delayed object reference | value]

Returns:

The type of object returned by self.result_builder.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Function that is called as we walk the graph to determine how to execute a hamilton function.

Parameters:
  • node – the node from the graph.

  • kwargs – the arguments that should be passed to it.

Returns:

returns a dask delayed object.

Pandas on Spark (Koalas)#

class hamilton.experimental.h_spark.SparkKoalasGraphAdapter(spark_session, result_builder: ResultMixin, spine_column: str)#

Class representing what’s required to make Hamilton run on Spark with Koalas, i.e. Pandas on Spark.

This walks the graph and translates it to run onto Apache Spark using the Pandas API on Spark

Use pip install sf-hamilton[spark] to get the dependencies required to run this.

Currently, this class assumes you’re running SPARK 3.2+. You’d generally use this if you have an existing spark cluster running in your workplace, and you want to scale to very large data set sizes.

Some tips on koalas (before it was merged into spark 3.2):

Spark is a more heavyweight choice to scale computation for Hamilton graphs creating a Pandas Dataframe.

Notes on scaling:#

  • Multi-core on single machine âś… (if you setup Spark locally to do so)

  • Distributed computation on a Spark cluster âś…

  • Scales to any size of data as permitted by Spark âś…

Function return object types supported:#

  • â›” Not generic. This does not work for every Hamilton graph.

  • âś… Currently we’re targeting this at Pandas/Koalas types [dataframes, series].

Pandas?#

  • âś… Koalas on Spark 3.2+ implements a good subset of the pandas API. Keep it simple and you should be good to go!

CAVEATS#

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(spark_session, result_builder: ResultMixin, spine_column: str)#

Constructor

You only have the ability to return either a Pandas on Spark Dataframe or a Pandas Dataframe. To do that you either use the stock base.PandasDataFrameResult class, or you use h_spark.KoalasDataframeResult.

Parameters:
  • spark_session – the spark session to use.

  • result_builder – the function to build the result – currently on Pandas and Koalas are “supported”.

  • spine_column – the column we should use first as the spine and then subsequently join against.

build_result(**outputs: Dict[str, Any]) Union[DataFrame, DataFrame, dict]#

This function builds the result given the computed values.

static check_input_type(node_type: Type, input_value: Any) bool#

Function to equate an input value, with expected node type.

We need this to equate pandas and koalas objects/types.

Parameters:
  • node_type – the declared node type

  • input_value – the actual input value

Returns:

whether this is okay, or not.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Function to help equate pandas with koalas types.

Parameters:
  • node_type – the declared node type.

  • input_type – the type of what we want to pass into it.

Returns:

whether this is okay, or not.

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Function that is called as we walk the graph to determine how to execute a hamilton function.

Parameters:
  • node – the node from the graph.

  • kwargs – the arguments that should be passed to it.

Returns:

returns a koalas column

PySpark UDFs#

class hamilton.experimental.h_spark.PySparkUDFGraphAdapter#

UDF graph adapter for PySpark.

This graph adapter enables one to write Hamilton functions that can be executed as UDFs in PySpark.

Core to this is the mapping of function arguments to Spark columns available in the passed in dataframe.

This adapter currently supports:

  • regular UDFs, these are executed in a row based fashion.

  • and a single variant of Pandas UDFs: func(series+) -> series

  • can also run regular Hamilton functions, which will execute spark driver side.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__()#
build_result(**outputs: Dict[str, Any]) DataFrame#

Builds the result and brings it back to this running process.

Parameters:

outputs – the dictionary of key -> Union[ray object reference | value]

Returns:

The type of object returned by self.result_builder.

static check_input_type(node_type: Type, input_value: Any) bool#

If the input is a pyspark dataframe, skip, else delegate the check.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Checks for the htype.column annotation and deals with it.

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Given a node to execute, process it and apply a UDF if applicable.

Parameters:
  • node – the node we’re processing.

  • kwargs – the inputs to the function.

Returns:

the result of the function.

Async Python#

class hamilton.experimental.h_async.AsyncGraphAdapter(result_builder: ResultMixin = None)#

Graph adapter for use with the AsyncDriver class.

__init__(result_builder: ResultMixin = None)#

Creates an AsyncGraphAdapter class. Note this will only work with the AsyncDriver class.

Some things to note:

  1. This executes everything at the end (recursively). E.G. the final DAG nodes are awaited

  2. This does not work with decorators when the async function is being decorated. That is because that function is called directly within the decorator, so we cannot await it.

static build_dataframe_with_dataframes(outputs: Dict[str, Any]) DataFrame#

Builds a dataframe from the outputs in an “outer join” manner based on index.

The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in. To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are redefined in a rolling fashion going in order of the outputs requested. This then results in an “enlarged” outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.

Parameters:

outputs – The outputs to build the dataframe from.

Returns:

A dataframe with the outputs.

build_result(**outputs: Dict[str, Any]) Any#

Currently this is a no-op – it just delegates to the resultsbuilder. That said, we could make it async, but it feels wrong – this will just be called after raw_execute.

Parameters:

outputs – Outputs (awaited) from the graph.

Returns:

The final results.

static check_input_type(node_type: Type, input_value: Any) bool#

Used to check whether the user inputs match what the execution strategy & functions can handle.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

static check_node_type_equivalence(node_type: Type, input_type: Type) bool#

Used to check whether two types are equivalent.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

static check_pandas_index_types_match(all_index_types: Dict[str, List[str]], time_indexes: Dict[str, List[str]], no_indexes: Dict[str, List[str]]) bool#

Checks that pandas index types match.

This only logs warning errors, and if debug is enabled, a debug statement to list index types.

execute_node(node: Node, kwargs: Dict[str, Any]) Any#

Executes a node. Note this doesn’t actually execute it – rather, it returns a task. This does not use async def, as we want it to be awaited on later – this await is done in processing parameters of downstream functions/final results. We can ensure that as we also run the driver that this corresponds to.

Note that this assumes that everything is awaitable, even if it isn’t. In that case, it just wraps it in one.

Parameters:
  • node – Node to wrap

  • kwargs – Keyword arguments (either coroutines or raw values) to call it with

Returns:

A task

static pandas_index_types(outputs: Dict[str, Any]) Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]]#

This function creates three dictionaries according to whether there is an index type or not.

The three dicts we create are: 1. Dict of index type to list of outputs that match it. 2. Dict of time series / categorical index types to list of outputs that match it. 3. Dict of no-index key to list of outputs with no index type.

Parameters:

outputs – the dict we’re trying to create a result from.

Returns:

dict of all index types, dict of time series/categorical index types, dict if there is no index