h_spark.SparkKoalasGraphAdapter¶

This is an experimental GraphAdapter; there is a possibility of their API changing. That said, the code is stable, and you should feel comfortable giving the code for a spin - let us know how it goes, and what the rough edges are if you find any. We’d love feedback if you are using these to know how to improve them or graduate them.

class hamilton.plugins.h_spark.SparkKoalasGraphAdapter(spark_session, result_builder: ResultMixin, spine_column: str)¶

Class representing what’s required to make Hamilton run on Spark with Koalas, i.e. Pandas on Spark.

This walks the graph and translates it to run onto Apache Spark using the Pandas API on Spark

Use pip install sf-hamilton[spark] to get the dependencies required to run this.

Currently, this class assumes you’re running SPARK 3.2+. You’d generally use this if you have an existing spark cluster running in your workplace, and you want to scale to very large data set sizes.

Some tips on koalas (before it was merged into spark 3.2):

Spark is a more heavyweight choice to scale computation for Hamilton graphs creating a Pandas Dataframe.

Notes on scaling:¶

  • Multi-core on single machine âś… (if you setup Spark locally to do so)

  • Distributed computation on a Spark cluster âś…

  • Scales to any size of data as permitted by Spark âś…

Function return object types supported:¶

  • â›” Not generic. This does not work for every Hamilton graph.

  • âś… Currently we’re targeting this at Pandas/Koalas types [dataframes, series].

Pandas?¶

  • âś… Koalas on Spark 3.2+ implements a good subset of the pandas API. Keep it simple and you should be good to go!

CAVEATS¶

  • Serialization costs can outweigh the benefits of parallelism, so you should benchmark your code to see if it’s worth it.

DISCLAIMER – this class is experimental, so signature changes are a possibility!

__init__(spark_session, result_builder: ResultMixin, spine_column: str)¶

Constructor

You only have the ability to return either a Pandas on Spark Dataframe or a Pandas Dataframe. To do that you either use the stock base.PandasDataFrameResult class, or you use h_spark.KoalasDataframeResult.

Parameters:
  • spark_session – the spark session to use.

  • result_builder – the function to build the result – currently on Pandas and Koalas are “supported”.

  • spine_column – the column we should use first as the spine and then subsequently join against.

build_result(**outputs: Dict[str, Any]) DataFrame | DataFrame | dict¶

Given a set of outputs, build the result.

Parameters:

outputs – the outputs from the execution of the graph.

Returns:

the result of the execution of the graph.

static check_input_type(node_type: Type, input_value: Any) bool¶

Function to equate an input value, with expected node type.

We need this to equate pandas and koalas objects/types.

Parameters:
  • node_type – the declared node type

  • input_value – the actual input value

Returns:

whether this is okay, or not.

static check_node_type_equivalence(node_type: Type, input_type: Type) bool¶

Function to help equate pandas with koalas types.

Parameters:
  • node_type – the declared node type.

  • input_type – the type of what we want to pass into it.

Returns:

whether this is okay, or not.

do_build_result(outputs: Dict[str, Any]) Any¶

Implements the do_build_result method from the BaseDoBuildResult class. This is kept from the user as the public-facing API is build_result, allowing us to change the API/implementation of the internal set of hooks

do_check_edge_types_match(type_from: type, type_to: type) bool¶

Method that checks whether two types are equivalent. This is used when the function graph is being created.

Parameters:
  • type_from – The type of the node that is the source of the edge.

  • type_to – The type of the node that is the destination of the edge.

Return bool:

Whether or not they are equivalent

do_node_execute(run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None) Any¶

Method that is called to implement node execution. This can replace the execution of a node with something all together, augment it, or delegate it.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • node – Node that is being executed

  • kwargs – Keyword arguments that are being passed into the node

  • task_id – ID of the task, defaults to None if not in a task setting

do_validate_input(node_type: type, input_value: Any) bool¶

Method that an input value maches an expected type.

Parameters:
  • node_type – The type of the node.

  • input_value – The value that we want to validate.

Returns:

Whether or not the input value matches the expected type.

execute_node(node: Node, kwargs: Dict[str, Any]) Any¶

Function that is called as we walk the graph to determine how to execute a hamilton function.

Parameters:
  • node – the node from the graph.

  • kwargs – the arguments that should be passed to it.

Returns:

returns a koalas column

input_types() List[Type[Type]]¶

Gives the applicable types to this result builder. This is optional for backwards compatibility, but is recommended.

Returns:

A list of types that this can apply to.

output_type() Type¶

Returns the output type of this result builder :return: the type that this creates