h_async.AsyncGraphAdapter¶

class hamilton.experimental.h_async.AsyncGraphAdapter(result_builder: ResultMixin = None)¶

Graph adapter for use with the AsyncDriver class.

__init__(result_builder: ResultMixin = None)¶

Creates an AsyncGraphAdapter class. Note this will only work with the AsyncDriver class.

Some things to note:

  1. This executes everything at the end (recursively). E.G. the final DAG nodes are awaited

  2. This does not work with decorators when the async function is being decorated. That is because that function is called directly within the decorator, so we cannot await it.

static build_dataframe_with_dataframes(outputs: Dict[str, Any]) DataFrame¶

Builds a dataframe from the outputs in an “outer join” manner based on index.

The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in. To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are redefined in a rolling fashion going in order of the outputs requested. This then results in an “enlarged” outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.

Parameters:

outputs – The outputs to build the dataframe from.

Returns:

A dataframe with the outputs.

build_result(**outputs: Dict[str, Any]) Any¶

Currently this is a no-op – it just delegates to the resultsbuilder. That said, we could make it async, but it feels wrong – this will just be called after raw_execute.

Parameters:

outputs – Outputs (awaited) from the graph.

Returns:

The final results.

static check_input_type(node_type: Type, input_value: Any) bool¶

Used to check whether the user inputs match what the execution strategy & functions can handle.

Static purely for legacy reasons.

Parameters:
  • node_type – The type of the node.

  • input_value – An actual value that we want to inspect matches our expectation.

Returns:

True if the input is valid, False otherwise.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool¶

Used to check whether two types are equivalent.

Static, purely for legacy reasons.

This is used when the function graph is being created and we’re statically type checking the annotations for compatibility.

Parameters:
  • node_type – The type of the node.

  • input_type – The type of the input that would flow into the node.

Returns:

True if the types are equivalent, False otherwise.

static check_pandas_index_types_match(all_index_types: Dict[str, List[str]], time_indexes: Dict[str, List[str]], no_indexes: Dict[str, List[str]]) bool¶

Checks that pandas index types match.

This only logs warning errors, and if debug is enabled, a debug statement to list index types.

do_build_result(outputs: Dict[str, Any]) Any¶

Implements the do_build_result method from the BaseDoBuildResult class. This is kept from the user as the public-facing API is build_result, allowing us to change the API/implementation of the internal set of hooks

do_check_edge_types_match(type_from: type, type_to: type) bool¶

Method that checks whether two types are equivalent. This is used when the function graph is being created.

Parameters:
  • type_from – The type of the node that is the source of the edge.

  • type_to – The type of the node that is the destination of the edge.

Return bool:

Whether or not they are equivalent

do_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None) Any¶

Method that is called to implement node execution. This can replace the execution of a node with something all together, augment it, or delegate it.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • node – Node that is being executed

  • kwargs – Keyword arguments that are being passed into the node

  • task_id – ID of the task, defaults to None if not in a task setting

do_validate_input(node_type: type, input_value: Any) bool¶

Method that an input value maches an expected type.

Parameters:
  • node_type – The type of the node.

  • input_value – The value that we want to validate.

Returns:

Whether or not the input value matches the expected type.

execute_node(node: Node, kwargs: Dict[str, Any]) Any¶

Executes a node. Note this doesn’t actually execute it – rather, it returns a task. This does not use async def, as we want it to be awaited on later – this await is done in processing parameters of downstream functions/final results. We can ensure that as we also run the driver that this corresponds to.

Note that this assumes that everything is awaitable, even if it isn’t. In that case, it just wraps it in one.

Parameters:
  • node – Node to wrap

  • kwargs – Keyword arguments (either coroutines or raw values) to call it with

Returns:

A task

input_types() List[Type[Type]]¶

Currently this just shoves anything into a dataframe. We should probably tighten this up.

output_type() Type¶

Returns the output type of this result builder :return: the type that this creates

static pandas_index_types(outputs: Dict[str, Any]) Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]]¶

This function creates three dictionaries according to whether there is an index type or not.

The three dicts we create are: 1. Dict of index type to list of outputs that match it. 2. Dict of time series / categorical index types to list of outputs that match it. 3. Dict of no-index key to list of outputs with no index type.

Parameters:

outputs – the dict we’re trying to create a result from.

Returns:

dict of all index types, dict of time series/categorical index types, dict if there is no index