with_columns¶

** Overview **

This is part of the hamilton pyspark integration. To install, run:

pip install sf-hamilton[pyspark]

Reference Documentation

class hamilton.plugins.h_spark.with_columns(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, select: List[str] = None, namespace: str = None, mode: str = 'append', config_required: List[str] = None)¶
__init__(*load_from: Callable | ModuleType, columns_to_pass: List[str] = None, pass_dataframe_as: str = None, select: List[str] = None, namespace: str = None, mode: str = 'append', config_required: List[str] = None)¶
Initializes a with_columns decorator for spark. This allows you to efficiently run

groups of map operations on a dataframe, represented as pandas/primitives UDFs. This effectively “linearizes” compute – meaning that a DAG of map operations can be run as a set of .withColumn operations on a single dataframe – ensuring that you don’t have to do a complex extract then join process on spark, which can be inefficient.

Here’s an example of calling it – if you’ve seen @subdag, you should be familiar with the concepts:

# my_module.py
def a(a_from_df: pd.Series) -> pd.Series:
    return _process(a)

def b(b_from_df: pd.Series) -> pd.Series:
    return _process(b)

def a_plus_b(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series:
    return a + b


# the with_columns call
@with_columns(
    load_from=[my_module], # Load from any module
    columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to
    # the subdag
    select=["a", "b", "a_plus_b"], # The columns to select from the dataframe
)
def final_df(initial_df: ps.DataFrame) -> ps.DataFrame:
    # process, or just return unprocessed
    ...

You can think of the above as a series of withColumn calls on the dataframe, where the operations are applied in topological order. This is significantly more efficient than extracting out the columns, applying the maps, then joining, but also allows you to express the operations individually, making it easy to unit-test and reuse.

Note that the operation is “append”, meaning that the columns that are selected are appended onto the dataframe. We will likely add an option to have this be either “select” or “append” mode.

If the function takes multiple dataframes, the dataframe input to process will always be the first one. This will be passed to the subdag, transformed, and passed back to the functions. This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code above, the dataframe that is passed to the subdag is initial_df. That is transformed by the subdag, and then returned as the final dataframe.

You can read it as:

“final_df is a function that transforms the upstream dataframe initial_df, running the transformations from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it.”

Parameters:
  • load_from – The functions that will be used to generate the group of map operations.

  • columns_to_pass – The initial schema of the dataframe. This is used to determine which upstream inputs should be taken from the dataframe, and which shouldn’t. Note that, if this is left empty (and external_inputs is as well), we will assume that all dependencies come from the dataframe. This cannot be used in conjunction with pass_dataframe_as.

  • pass_dataframe_as – The name of the dataframe that we’re modifying, as known to the subdag. If you pass this in, you are responsible for extracting columns out. If not provided, you have to pass columns_to_pass in, and we will extract the columns out for you.

  • select – Outputs to select from the subdag, i.e. functions/module passed int. If this is left blank it will add all possible columns from the subdag to the dataframe.

  • namespace – The namespace of the nodes, so they don’t clash with the global namespace and so this can be reused. If its left out, there will be no namespace (in which case you’ll want to be careful about repeating it/reusing the nodes in other parts of the DAG.)

  • mode – The mode of the operation. This can be either “append” or “select”. If it is “append”, it will keep all original columns in the dataframe, and append what’s in select. If it is “select”, it will do a global select of columns in the dataframe from the select parameter. Note that, if the select parameter is left blank, it will add all columns in the dataframe that are in the subdag. This defaults to append. If you’re using select, use the @select decorator instead.

  • config_required – the list of config keys that are required to resolve any functions. Pass in None if you want the functions/modules to have access to all possible config.