Decorators#

While the 1:1 mapping of output -> function implementation is powerful, we’ve implemented a few decorators to promote business-logic reuse. Source for these decorators can be found in the function_modifiers module.

For reference we list available decorators for Hamilton here. Note: use from hamilton.function_modifiers import DECORATOR_NAME to use these decorators:

@config*#

class hamilton.function_modifiers.config(resolves: Callable[[Dict[str, Any]], bool], target_name: str = None, config_used: List[str] = None)#

Decorator class that determines whether a function should be in the DAG based on some configuration variable.

Notes:

  1. Currently, functions that exist in all configurations have to be disjoint.

  2. There is currently no @config.otherwise(...) decorator, so make sure to have config.when specify set of configuration possibilities. Any missing cases will not have that output (and subsequent downstream functions may error out if they ask for it).

  3. To make this easier, we have a few more @config decorators:
    • @config.when_not(param=value) Will be included if the parameter is _not_ equal to the value specified.

    • @config.when_in(param=[value1, value2, ...]) Will be included if the parameter is equal to one of the specified values.

    • @config.when_not_in(param=[value1, value2, ...]) Will be included if the parameter is not equal to any of the specified values.

    • @config If you’re feeling adventurous, you can pass in a lambda function that takes in the entire configuration and resolves to True or False. You probably don’t want to do this.

Example:

@config.when_in(business_line=["mens","kids"], region=["uk"])
def LEAD_LOG_BASS_MODEL_TIMES_TREND(
     TREND_BSTS_WOMENS_ACQUISITIONS: pd.Series,
     LEAD_LOG_BASS_MODEL_SIGNUPS_NON_REFERRAL: pd.Series) -> pd.Series:
     # logic
     ...

Example - use of __suffix to differentiate between functions with the same name. This is required if you want to use the same function name in multiple configurations. Hamilton will automatically drop the suffix for you. The following will ensure only one function is registered with the name my_transform:

@config.when(region="us")
def my_transform__us(some_input: pd.Series, some_input_b: pd.Series) -> pd.Series:
     # logic
     ...

@config.when(region="uk")
def my_transform__uk(some_input: pd.Series, some_input_c: pd.Series) -> pd.Series:
     # logic
     ...

@config If you’re feeling adventurous, you can pass in a lambda function that takes in the entire configuration and resolves to True or False. You probably don’t want to do this.

__init__(resolves: Callable[[Dict[str, Any]], bool], target_name: str = None, config_used: List[str] = None)#

Decorator that resolves a function based on the configuration…

Parameters:
  • resolves – the python function to use to resolve whether the wrapped function should exist in the graph or not.

  • target_name – Optional. The name of the “function”/”node” that we want to attach @config to.

  • config_used – Optional. The list of config names that this function uses.

static when(name=None, **key_value_pairs) config#

Yields a decorator that resolves the function if all keys in the config are equal to the corresponding value.

Parameters:

key_value_pairs – Keys and corresponding values to look up in the config

Returns:

a configuration decorator

static when_in(name=None, **key_value_group_pairs: Collection[Any]) config#

Yields a decorator that resolves the function if all of the values corresponding to the config keys are equal to one of items in the list of values.

@config.when_in(param=[value1, value2, ...]) Will be included if the parameter is equal to one of the specified values.

Parameters:

key_value_group_pairs – pairs of key-value mappings where the value is a list of possible values

Returns:

a configuration decorator

static when_not(name=None, **key_value_pairs: Any) config#

Yields a decorator that resolves the function if none keys in the config are equal to the corresponding value

@config.when_not(param=value) will be included if the parameter is _not_ equal to the value specified.

Parameters:

key_value_pairs – Keys and corresponding values to look up in the config

Returns:

a configuration decorator

static when_not_in(**key_value_group_pairs: Collection[Any]) config#

Yields a decorator that resolves the function only if none of the keys are in the list of values.

@config.when_not_in(param=[value1, value2, ...]) Will be included if the parameter is not equal to any of the specified values.

Parameters:

key_value_group_pairs – pairs of key-value mappings where the value is a list of possible values

Returns:

a configuration decorator

@config.when_not_in(business_line=["mens","kids"], region=["uk"])
def LEAD_LOG_BASS_MODEL_TIMES_TREND(
    TREND_BSTS_WOMENS_ACQUISITIONS: pd.Series,
    LEAD_LOG_BASS_MODEL_SIGNUPS_NON_REFERRAL: pd.Series) -> pd.Series:

above will resolve for config has {“business_line”: “womens”, “region”: “us”}, but not for configs that have {“business_line”: “mens”, “region”: “us”}, {“business_line”: “kids”, “region”: “us”}, or {“region”: “uk”}.

See also

:ref:config.when_not

@tag*#

class hamilton.function_modifiers.tag(*, target_: Union[str, Collection[str], None, ellipsis] = None, **tags: str)#

Decorator class that adds a tag to a node. Tags take the form of key/value pairings. Tags can have dots to specify namespaces (keys with dots), but this is usually reserved for special cases (E.G. subdecorators) that utilize them. Usually one will pass in tags as kwargs, so we expect tags to be un-namespaced in most uses.

That is using:

@tag(my_tag='tag_value')
def my_function(...) -> ...:

is un-namespaced because you cannot put a . in the keyword part (the part before the ‘=’).

But using:

@tag(**{'my.tag': 'tag_value'})
def my_function(...) -> ...:

allows you to add dots that allow you to namespace your tags.

Currently, tag values are restricted to allowing strings only, although we may consider changing the in the future (E.G. thinking of lists).

Hamilton also reserves the right to change the following: * adding purely positional arguments * not allowing users to use a certain set of top-level prefixes (E.G. any tag where the top level is one of the values in RESERVED_TAG_PREFIX).

Example usage:

@tag(foo='bar', a_tag_key='a_tag_value', **{'namespace.tag_key': 'tag_value'})
def my_function(...) -> ...:
   ...
__init__(*, target_: Union[str, Collection[str], None, ellipsis] = None, **tags: str)#

Constructor for adding tag annotations to a function.

Parameters:
  • target_

    Target nodes to decorate. This can be one of the following:

    • None: tag all nodes outputted by this that are “final” (E.g. do not have a node outputted by this that depend on them)

    • Ellipsis (…): tag all nodes outputted by this

    • Collection[str]: tag only the nodes with the specified names

    • str: tag only the node with the specified name

  • tags – the keys are always going to be strings, so the type annotation here means the values are strings. Implicitly this is Dict[str, str] but the PEP guideline is to only annotate it with str.

class hamilton.function_modifiers.tag_outputs(**tag_mapping: Dict[str, str])#
__init__(**tag_mapping: Dict[str, str])#

Creates a tag_outputs decorator.

Note that this currently does not validate whether the nodes are spelled correctly as it takes in a superset of nodes.

Parameters:

tag_mapping – Mapping of output name to tags – this is akin to applying @tag to individual outputs produced by the function.

Example usage:

@tag_output(**{'a': {'a_tag': 'a_tag_value'}, 'b': {'b_tag': 'b_tag_value'}})
@extract_columns("a", "b")
def example_tag_outputs() -> pd.DataFrame:
    return pd.DataFrame.from_records({"a": [1], "b": [2]})

@extract*#

class hamilton.function_modifiers.extract_columns(*columns: Union[Tuple[str, str], str], fill_with: Any = None)#
__init__(*columns: Union[Tuple[str, str], str], fill_with: Any = None)#

Constructor for a modifier that expands a single function into the following nodes:

  • n functions, each of which take in the original dataframe and output a specific column

  • 1 function that outputs the original dataframe

Parameters:
  • columns – Columns to extract, that can be a list of tuples of (name, documentation) or just names.

  • fill_with – If you want to extract a column that doesn’t exist, do you want to fill it with a default value? Or do you want to error out? Leave empty/None to error out, set fill_value to dynamically create a column.

class hamilton.function_modifiers.extract_fields(fields: dict, fill_with: Any = None)#

Extracts fields from a dictionary of output.

__init__(fields: dict, fill_with: Any = None)#

Constructor for a modifier that expands a single function into the following nodes:

  • n functions, each of which take in the original dict and output a specific field

  • 1 function that outputs the original dict

Parameters:
  • fields – Fields to extract. A dict of ‘field_name’ -> ‘field_type’.

  • fill_with – If you want to extract a field that doesn’t exist, do you want to fill it with a default value? Or do you want to error out? Leave empty/None to error out, set fill_value to dynamically create a field value.

@check_output*#

class hamilton.function_modifiers.check_output(importance: str = 'warn', default_decorator_candidates: Type[BaseDefaultValidator] = None, target_: Union[str, Collection[str], None, ellipsis] = None, **default_validator_kwargs: Any)#

The @check_output decorator enables you to add simple data quality checks to your code.

For example:

import pandas as pd
import numpy as np
from hamilton.function_modifiers import check_output

@check_output(
    data_type=np.int64,
    data_in_range=(0,100),
    importance="warn",
)
def some_int_data_between_0_and_100() -> pd.Series:
    ...

The check_output decorator takes in arguments that each correspond to one of the default validators. These arguments tell it to add the default validator to the list. The above thus creates two validators, one that checks the datatype of the series, and one that checks whether the data is in a certain range.

Pandera example that shows how to use the check_output decorator with a Pandera schema:

import pandas as pd
import pandera as pa
from hamilton.function_modifiers import check_output
from hamilton.function_modifiers import extract_columns

schema = pa.DataFrameSchema(...)

@extract_columns('col1', 'col2')
@check_output(schema=schema, target_="builds_dataframe", importance="fail")
def builds_dataframe(...) -> pd.DataFrame:
    ...
__init__(importance: str = 'warn', default_decorator_candidates: Type[BaseDefaultValidator] = None, target_: Union[str, Collection[str], None, ellipsis] = None, **default_validator_kwargs: Any)#

Creates the check_output validator.

This constructs the default validator class.

Note: that this creates a whole set of default validators. TODO – enable construction of custom validators using check_output.custom(*validators).

Parameters:
  • importance – For the default validator, how important is it that this passes.

  • default_validator_kwargs – keyword arguments to be passed to the validator.

  • target_ – a target specifying which nodes to decorate. See the docs in check_output_custom for a quick overview and the docs in function_modifiers.base.NodeTransformer for more detail.

class hamilton.function_modifiers.check_output_custom(*validators: DataValidator, target_: Union[str, Collection[str], None, ellipsis] = None)#

Class to use if you want to implement your own custom validators.

Come chat to us in slack if you’re interested in this!

__init__(*validators: DataValidator, target_: Union[str, Collection[str], None, ellipsis] = None)#

Creates a check_output_custom decorator. This allows passing of custom validators that implement the DataValidator interface.

Parameters:
  • validators – Validator to use.

  • target_

    The nodes to check the output of. For more detail read the docs in function_modifiers.base.NodeTransformer, but your options are:

    1. None: This will check just the “final node” (the node that is returned by the decorated function).

    2. … (Ellipsis): This will check all nodes in the subDAG created by this.

    3. string: This will check the node with the given name.

    4. Collection[str]: This will check all nodes specified in the list.

    In all likelihood, you don’t want ..., but the others are useful.

@parameterize*#

Classes to help with @parameterize:#

class hamilton.function_modifiers.ParameterizedExtract(outputs: Tuple[str, ...], input_mapping: Dict[str, ParametrizedDependency])#

Dataclass to hold inputs for @parameterize and @parameterize_extract_columns.

Parameters:
  • outputs – A tuple of strings, each of which is the name of an output.

  • input_mapping – A dictionary of string to ParametrizedDependency. The string is the name of the python parameter of the decorated function, and the value is a “source”/”value” which will be passed as input for that parameter to the function.

class hamilton.function_modifiers.source(dependency_on: Any)#

Specifies that a parameterized dependency comes from an upstream source.

This means that it comes from a node somewhere else. E.G. source(“foo”) means that it should be assigned the value that “foo” outputs.

Parameters:

dependency_on – Upstream function (i.e. node) to come from.

Returns:

An UpstreamDependency object – a signifier to the internal framework of the dependency type.

class hamilton.function_modifiers.value(literal_value: Any)#

Specifies that a parameterized dependency comes from a “literal” source.

E.G. value(“foo”) means that the value is actually the string value “foo”.

Parameters:

literal_value – Python literal value to use. :return: A LiteralDependency object – a signifier to the internal framework of the dependency type.

class hamilton.function_modifiers.group(*dependency_args: ParametrizedDependency, **dependency_kwargs: ParametrizedDependency)#

Specifies that a parameterized dependency comes from a “grouped” source.

This means that it gets injected into a list of dependencies that are grouped together. E.G. dep=group(source(“foo”), source(“bar”)) for the function:

@inject(dep=group(source("foo"), source("bar")))
def f(dep: List[pd.Series]) -> pd.Series:
    return ...

Would result in dep getting foo and bar dependencies injected.

Parameters:
  • dependency_args – Dependencies, list of dependencies (e.g. source(“foo”), source(“bar”))

  • dependency_kwargs – Dependencies, kwarg dependencies (e.g. foo=source(“foo”))

Returns:

Actual decorators:#

class hamilton.function_modifiers.parameterize(**parametrization: Union[Dict[str, ParametrizedDependency], Tuple[Dict[str, ParametrizedDependency], str]])#

Decorator to use to create many functions.

Expands a single function into n, each of which correspond to a function in which the parameter value is replaced either by:

  1. A specified literal value, denoted value(‘literal_value’).

  2. The output from a specified upstream function (i.e. node), denoted source(‘upstream_function_name’).

Note that parameterize can take the place of @parameterize_sources or @parameterize_values decorators below. In fact, they delegate to this!

Examples expressing different syntax:

@parameterize(
    # tuple of assignments (consisting of literals/upstream specifications), and docstring.
    replace_no_parameters=({}, 'fn with no parameters replaced'),
)
def no_param_function() -> Any:
    ...

@parameterize(
    # tuple of assignments (consisting of literals/upstream specifications), and docstring.
    replace_just_upstream_parameter=(
        {'upstream_source': source('foo_source')},
        'fn with upstream_parameter set to node foo'
    ),
)
def param_is_upstream_function(upstream_source: Any) -> Any:
    '''Doc string that can also be parameterized: {upstream_source}.'''
    ...

@parameterize(
    replace_just_literal_parameter={'literal_parameter': value('bar')},
)
def param_is_literal_value(literal_parameter: Any) -> Any:
    '''Doc string that can also be parameterized: {literal_parameter}.'''
    ...

@parameterize(
    replace_both_parameters={
        'upstream_parameter': source('foo_source'),
        'literal_parameter': value('bar')
    }
)
def concat(upstream_parameter: Any, literal_parameter: str) -> Any:
    '''Adding {literal_parameter} to {upstream_parameter} to create {output_name}.'''
    return upstream_parameter + literal_parameter

You also have the capability to “group” parameters, which will combine them into a list.

@parameterize(
    a_plus_b_plus_c={
        'to_concat' : group(source('a'), value('b'), source('c'))
    }
)
def concat(to_concat: List[str]) -> Any:
    '''Adding {literal_parameter} to {upstream_parameter} to create {output_name}.'''
    return sum(to_concat, '')
__init__(**parametrization: Union[Dict[str, ParametrizedDependency], Tuple[Dict[str, ParametrizedDependency], str]])#

Decorator to use to create many functions.

Parameters:

parametrization

**kwargs with one of two things:

  • a tuple of assignments (consisting of literals/upstream specifications), and docstring.

  • just assignments, in which case it parametrizes the existing docstring.

class hamilton.function_modifiers.parameterize_sources(**parameterization: Dict[str, Dict[str, str]])#

Expands a single function into n, each of which corresponds to a function in which the parameters specified are mapped to the specified inputs. Note this decorator and @parameterize_values are quite similar, except that the input here is another DAG node(s), i.e. column/input, rather than a specific scalar/static value.

import pandas as pd
from hamilton.function_modifiers import parameterize_sources

@parameterize_sources(
   D_ELECTION_2016_shifted=dict(one_off_date='D_ELECTION_2016'),
   SOME_OUTPUT_NAME=dict(one_off_date='SOME_INPUT_NAME')
)
def date_shifter(one_off_date: pd.Series) -> pd.Series:
   '''{one_off_date} shifted by 1 to create {output_name}'''
   return one_off_date.shift(1)
__init__(**parameterization: Dict[str, Dict[str, str]])#

Constructor for a modifier that expands a single function into n, each of which corresponds to replacing some subset of the specified parameters with specific upstream nodes.

Note this decorator and @parametrized_input are similar, except this one allows multiple parameters to be mapped to multiple function arguments (and it fixes the spelling mistake).

parameterized_sources allows you keep your code DRY by reusing the same function but replace the inputs to create multiple corresponding distinct outputs. We see here that parameterized_inputs allows you to keep your code DRY by reusing the same function to create multiple distinct outputs. The key word arguments passed have to have the following structure:

> OUTPUT_NAME = Mapping of function argument to input that should go into it.

The documentation for the output is taken from the function. The documentation string can be templatized with the parameter names of the function and the reserved value output_name - those will be replaced with the corresponding values from the parameterization.

Parameters:

**parameterization – kwargs of output name to dict of parameter mappings.

class hamilton.function_modifiers.parameterize_values(parameter: str, assigned_output: Dict[Tuple[str, str], Any])#

Expands a single function into n, each of which corresponds to a function in which the parameter value is replaced by that specific value.

import pandas as pd
from hamilton.function_modifiers import parameterize_values
import internal_package_with_logic

ONE_OFF_DATES = {
     #output name        # doc string               # input value to function
    ('D_ELECTION_2016', 'US Election 2016 Dummy'): '2016-11-12',
    ('SOME_OUTPUT_NAME', 'Doc string for this thing'): 'value to pass to function',
}
            # parameter matches the name of the argument in the function below
@parameterize_values(parameter='one_off_date', assigned_output=ONE_OFF_DATES)
def create_one_off_dates(date_index: pd.Series, one_off_date: str) -> pd.Series:
    '''Given a date index, produces a series where a 1 is placed at the date index that would contain that event.'''
    one_off_dates = internal_package_with_logic.get_business_week(one_off_date)
    return internal_package_with_logic.bool_to_int(date_index.isin([one_off_dates]))
__init__(parameter: str, assigned_output: Dict[Tuple[str, str], Any])#

Constructor for a modifier that expands a single function into n, each of which corresponds to a function in which the parameter value is replaced by that specific value.

Parameters:
  • parameter – Parameter to expand on.

  • assigned_output – A map of tuple of [parameter names, documentation] to values

class hamilton.function_modifiers.parameterize_extract_columns(*extract_config: ParameterizedExtract, reassign_columns: bool = True)#

@parameterize_extract_columns gives you the power of both @extract_columns and @parameterize in one decorator.

It takes in a list of Parameterized_Extract objects, each of which is composed of: 1. A list of columns to extract, and 2. A parameterization that gets used

In the following case, we produce four columns, two for each parameterization:

import pandas as pd
from function_modifiers import parameterize_extract_columns, ParameterizedExtract, source, value
@parameterize_extract_columns(
    ParameterizedExtract(
        ("outseries1a", "outseries2a"),
        {"input1": source("inseries1a"), "input2": source("inseries1b"), "input3": value(10)},
    ),
    ParameterizedExtract(
        ("outseries1b", "outseries2b"),
        {"input1": source("inseries2a"), "input2": source("inseries2b"), "input3": value(100)},
    ),
)
def fn(input1: pd.Series, input2: pd.Series, input3: float) -> pd.DataFrame:
    return pd.concat([input1 * input2 * input3, input1 + input2 + input3], axis=1)
__init__(*extract_config: ParameterizedExtract, reassign_columns: bool = True)#

Initializes a parameterized_extract decorator. Note this currently works for series, but the plan is to extend it to fields as well…

Parameters:
  • extract_config – A configuration consisting of a list ParameterizedExtract classes These contain the information of a @parameterized and @extract… together.

  • reassign_columns – Whether we want to reassign the columns as part of the function.

class hamilton.experimental.decorators.parameterize_frame.parameterize_frame(parameterization: DataFrame)#

EXPERIMENTAL! Instantiates a parameterize_extract decorator using a dataframe to specify a set of extracts + parameterizations.

This is an experimental decorator and the API may change in the future; please provide feedback whether this API does or does not work for you.

Parameters:

parameterization – Parameterization dataframe. See below.

This is of a specific shape:

  1. Index - Level 0: list of parameter names

  2. Index - Level 1: types of things to inject, either:

    • “out” (meaning this is an output),

    • “value” (meaning this is a literal value)

    • “source” (meaning this node comes from an upstream value)

  3. Contents:

  • Each row corresponds to the index. Each of these corresponds to an output node from this.

Note your function has to take in the column-names and output a dataframe with those names – we will likely change it so that’s not the case, and it can just use the position of the columns.

Example usage:

from hamilton.experimental.decorators.parameterize_frame import parameterize_frame
df = pd.DataFrame(
[
   ["outseries1a", "outseries2a", "inseries1a", "inseries2a", 5.0],
   ["outseries1b", "outseries2b", "inseries1b", "inseries2b", 0.2],
],
# specify column names corresponding to function arguments and
# if outputting multiple columns, output dataframe columns.
columns=[
   ["output1", "output2", "input1", "input2", "input3"],
   ["out", "out", "source", "source", "value"],
])

@parameterize_frame(df)
def my_func(
    input1: pd.Series, input2: pd.Series, input3: float
) -> pd.DataFrame:
   ...
__init__(parameterization: DataFrame)#

Initializes a parameterized_extract decorator. Note this currently works for series, but the plan is to extend it to fields as well…

Parameters:
  • extract_config – A configuration consisting of a list ParameterizedExtract classes These contain the information of a @parameterized and @extract… together.

  • reassign_columns – Whether we want to reassign the columns as part of the function.

@inject#

class hamilton.function_modifiers.inject(**key_mapping: ParametrizedDependency)#

@inject allows you to replace parameters with values passed in. You can think of it as a @parameterize call that has only one parameterization, the result of which is the name of the function. See the following examples:

import pandas as pd
from function_modifiers import inject, source, value, group

@inject(nums=group(source('a'), value(10), source('b'), value(2)))
def a_plus_10_plus_b_plus_2(nums: List[int]) -> int:
    return sum(nums)

This would be equivalent to:

@parameterize(
    a_plus_10_plus_b_plus_2={
        'nums': group(source('a'), value(10), source('b'), value(2))
    })
def sum_numbers(nums: List[int]) -> int:
    return sum(nums)

Something to note – we currently do not support the case in which the same parameter is utilized multiple times as an injection. E.G. two lists, a list and a dict, two sources, etc…

This is considered undefined behavior, and should be avoided.

__init__(**key_mapping: ParametrizedDependency)#

Instantiates an @inject decorator with the given key_mapping.

Parameters:

key_mapping – A dictionary of string to dependency spec. This is the same as the input mapping in @parameterize.

@does#

class hamilton.function_modifiers.does(replacing_function: Callable, **argument_mapping: Union[str, List[str]])#

@does is a decorator that essentially allows you to run a function over all the input parameters. So you can’t pass any old function to @does, instead the function passed has to take any amount of inputs and process them all in the same way.

import pandas as pd
from hamilton.function_modifiers import does
import internal_package_with_logic

def sum_series(**series: pd.Series) -> pd.Series:
    '''This function takes any number of inputs and sums them all together.'''
    ...

@does(sum_series)
def D_XMAS_GC_WEIGHTED_BY_DAY(D_XMAS_GC_WEIGHTED_BY_DAY_1: pd.Series,
                              D_XMAS_GC_WEIGHTED_BY_DAY_2: pd.Series) -> pd.Series:
    '''Adds D_XMAS_GC_WEIGHTED_BY_DAY_1 and D_XMAS_GC_WEIGHTED_BY_DAY_2'''
    pass

@does(internal_package_with_logic.identity_function)
def copy_of_x(x: pd.Series) -> pd.Series:
    '''Just returns x'''
    pass

The example here is a function, that all that it does, is sum all the parameters together. So we can annotate it with the @does decorator and pass it the sum_series function. The @does decorator is currently limited to just allow functions that consist only of one argument, a generic **kwargs.

__init__(replacing_function: Callable, **argument_mapping: Union[str, List[str]])#

Constructor for a modifier that replaces the annotated functions functionality with something else. Right now this has a very strict validation requirements to make compliance with the framework easy.

Parameters:
  • replacing_function – The function to replace the original function with.

  • argument_mapping – A mapping of argument name in the replacing function to argument name in the decorating function.

@subdag#

class hamilton.function_modifiers.subdag(*load_from: Union[module, Callable], inputs: Dict[str, ParametrizedDependency] = None, config: Dict[str, Any] = None, namespace: str = None, final_node_name: str = None, external_inputs: List[str] = None)#

The @subdag decorator enables you to rerun components of your DAG with varying parameters. That is, it enables you to “chain” what you could express with a driver into a single DAG.

That is, instead of using Hamilton within itself:

def feature_engineering(source_path: str) -> pd.DataFrame:
    '''You could recursively use Hamilton within itself.'''
    dr = driver.Driver({}, feature_modules)
    df = dr.execute(["feature_df"], inputs={"path": source_path})
    return df

You instead can use the @subdag decorator to do the same thing, with the added benefit of visibility into the whole DAG:

@subdag(
    feature_modules,
    inputs={"path": source("source_path")},
    config={}
)
def feature_engineering(feature_df: pd.DataFrame) -> pd.DataFrame:
    return feature_df

Note that this is immensely powerful – if we draw analogies from Hamilton to standard procedural programming paradigms, we might have the following correspondence:

  • config.when + friends – if/else statements

  • parameterize/extract_columnsfor loop

  • does – effectively macros

And so on. @subdag takes this one step further:

  • @subdag – subroutine definition

E.G. take a certain set of nodes, and run them with specified parameters.

@subdag declares parameters that are outputs of its subdags. Note that, if you want to use outputs of other components of the DAG, you can use the external_inputs parameter to declare the parameters that do not come from the subDAG.

Why might you want to use this? Let’s take a look at some examples:

  1. You have a feature engineering pipeline that you want to run on multiple datasets. If its exactly the same, this is perfect. If not, this works perfectly as well, you just have to utilize different functions in each or the config.when + config parameter to rerun it.

  2. You want to train multiple models in the same DAG that share some logic (in features or training) – this allows you to reuse and continually add more.

  3. You want to combine multiple similar DAGs (e.g. one for each business line) into one so you can build a cross-business line model.

This basically bridges the gap between the flexibility of non-declarative pipelining frameworks with the readability/maintainability of declarative ones.

__init__(*load_from: Union[module, Callable], inputs: Dict[str, ParametrizedDependency] = None, config: Dict[str, Any] = None, namespace: str = None, final_node_name: str = None, external_inputs: List[str] = None)#

Adds a subDAG to the main DAG.

Parameters:
  • load_from – The functions that will be used to generate this subDAG.

  • inputs – Parameterized dependencies to inject into all sources of this subDAG. This should not be an intermediate node in the subDAG.

  • config – A configuration dictionary for just this subDAG. Note that this passed in value takes precedence over the DAG’s config.

  • namespace – Namespace with which to prefix nodes. This is optional – if not included, this will default to the function name.

  • final_node_name – Name of the final node in the subDAG. This is optional – if not included, this will default to the function name.

  • external_inputs – Parameters in the function that are not produced by the functions passed to the subdag. This is useful if you want to perform some logic with other inputs in the subdag’s processing function.

@parameterized_subdag#

class hamilton.function_modifiers.parameterized_subdag(*load_from: Union[module, Callable], inputs: Dict[str, Union[ParametrizedDependency, LiteralDependency]] = None, config: Dict[str, Any] = None, external_inputs: List[str] = None, **parameterization: SubdagParams)#

parameterized subdag is when you want to create multiple subdags at one time. Why might you want to do this?

  1. You have multiple data sets you want to run the same feature engineering pipeline on.

  2. You want to run some sort of optimization routine with a variety of results

  3. You want to run some sort of pipeline over slightly different configuration (E.G. region/business line)

Note that this really is just syntactic sugar for creating multiple subdags, just as @parameterize is syntactic sugar for creating multiple nodes from a function. That said, it is common that you won’t know what you want until compile time (E.G. when you have the config available), so this decorator along with the `@resolve decorator is a good way to make that feasible. Note that we are getting into advanced Hamilton here – we don’t recommend starting with this. In fact, we generally recommend repeating subdags multiple times if you don’t have too many. That said, that can get cumbersome if you have a lot, so this decorator is a good way to help with that.

Let’s take a look at an example:

@parameterized_subdag(
    feature_modules,
    from_datasource_1={"inputs" : {"data" : value("datasource_1.csv"}},
    from_datasource_2={"inputs" : {"data" : value("datasource_2.csv"}},
    from_datasource_3={
        "inputs" : {"data" : value("datasource_3.csv"},
        "config" : {"filter" : "only_even_client_ids"}
    }
)
def feature_engineering(feature_df: pd.DataFrame) -> pd.DataFrame:
    return feature_df

This is (obviously) contrived, but what it does is create three subdags, each with a different data source. The third one also applies a configuration to that subdags. Note that we can also pass in inputs/config to the decorator itself, which will be applied to all subdags.

This is effectively the same as the example above.

@parameterized_subdag(
    feature_modules,
    inputs={"data" : value("datasource_1.csv")},
    from_datasource_1={},
    from_datasource_2={
            "inputs" : {"data" : value("datasource_2.csv"}
    },
    from_datasource_3={
            "inputs" : {"data" : value("datasource_3.csv"},
            "config" : {"filter" : "only_even_client_ids"},
    }
)

Again, think about whether this feature is really the one you want – often times, verbose, static DAGs are far more readable than very concise, highly parameterized DAGs.

__init__(*load_from: Union[module, Callable], inputs: Dict[str, Union[ParametrizedDependency, LiteralDependency]] = None, config: Dict[str, Any] = None, external_inputs: List[str] = None, **parameterization: SubdagParams)#

Initializes a parameterized_subdag decorator.

Parameters:
  • load_from – Modules to load from

  • inputs – Inputs for each subdag generated by the decorated function

  • config – Config for each subdag generated by the decorated function

  • external_inputs – External inputs to all parameterized subdags. Note that if you pass in any external inputs from local subdags, it overrides this (does not merge).

  • parameterization

    Parameterizations for each subdag generated. Note that this overrides any inputs/config passed to the decorator itself.

    Furthermore, note the following:

    1. The parameterizations passed to the constructor are **kwargs, so you are not allowed to name these load_from, inputs, or config. That’s a good thing, as these are not good names for variables anyway.

    2. Any empty items (not included) will default to an empty dict (or an empty list in the case of parameterization)

@resolve#

class hamilton.function_modifiers.resolve(*, when: ResolveAt, decorate_with: Callable[[...], NodeTransformLifecycle])#

Decorator class to delay evaluation of decorators until after the configuration is available. Note: this is a power-user feature, and you have to enable power-user mode! To do so, you have to add the configuration hamilton.enable_power_user_mode=True to the config you pass into the driver.

If not, this will break when it tries to instantiate a DAG.

This is particularly useful when you don’t know how you want your functions to resolve until configuration time. Say, for example, we want to add two series, and we need to pass the set of series to add as a configuration parameter, as we’ll be changing it regularly. Without this, you would have to have them as part of the same dataframe. E.G.

@parameterize_values(
    series_sum_1={"s1": "series_1", "s2": "series_2"},
    series_sum_2={"s1": "series_3", "s2": "series_4"},
)
def summation(df: pd.DataFrame, s1: str, s2: str) -> pd.Series:
    return df[s1] + df[s2]

Note that there are a lot of benefits to this code, but it is a workaround for the fact that we cannot configure the dependencies. With the @resolve decorator, we can actually dynamically set the shape of the DAG based on config:

from hamilton.function_modifiers import resolve, ResolveAt

@resolve(
    when=ResolveAt.CONFIG_AVAILABLE
    decorate_with=lambda first_series_sum, second_series_sum: parameterize_sources(
        series_sum_1={"s1": first_series_sum[0], "s2": second_series_sum[1]},
        series_sum_2={"s1": second_series_sum[1], "s2": second_series_sum[2]},

)
def summation(s1: pd.Series, s2: pd.Series) -> pd.Series:
    return s1 + s2

Note how this works: 1. The decorate_with argument is a function that gives you the decorator you want to apply. Currently its “hamilton-esque” – while we do not require it to be typed, you can use a separate configuration-reoslver function (and include type information). This lambda function must return a decorator. 2. The when argument is the point at which you want to resolve the decorator. Currently, we only support ResolveAt.CONFIG_AVAILABLE, which means that the decorator will be resolved at compile time, E.G. when the driver is instantiated. 3. This is then run and dynamically resolved.

This is powerful, but the code is uglier. It’s meant to be used in some very specific cases, E.G. When you want time-series data on a per-column basis (E.G. once per month), and don’t want that hardcoded. While it is possible to store this up in a JSON file and run parameterization on the loaded result as a global variable, it is much cleaner to pass it through the DAG, which is why we support it. However, since the code goes against one of Hamilton’s primary tenets ( that all code is highly readable), we require that you enable power_user_mode.

We highly recommend that you put all functions decorated with this in their own module, keeping it separate from the rest of your functions. This way, you can import/build DAGs from the rest of your functions without turning on power-user mode.

__init__(*, when: ResolveAt, decorate_with: Callable[[...], NodeTransformLifecycle])#

Initializes a delayed decorator that gets called at some specific resolution time.

Parameters:
  • decorate_with – Function that takes required and optional parameters/returns a decorator.

  • until – When to resolve the decorator. Currently only supports ResolveAt.CONFIG_AVAILABLE.

@load_from#

class hamilton.function_modifiers.load_from#

Decorator to inject externally loaded data into a function. Ideally, anything that is not a pure transform should either call this, or accept inputs from an external location.

This decorator functions by “injecting” a parameter into the function. For example, the following code will load the json file, and inject it into the function as the parameter input_data. Note that the path for the JSON file comes from another node called raw_data_path (which could also be passed in as an external input).

@load_from.json(path=source("raw_data_path"))
def raw_data(input_data: dict) -> dict:
    return input_data

The decorator can also be used with value to inject a constant value into the loader. In the following case, we use the literal value “some/path.json” as the path to the JSON file.

@load_from.json(path=value("some/path.json"))
def raw_data(input_data: dict) -> dict:
    return input_data

You can also utilize the inject_ parameter in the loader if you want to inject the data into a specific param. For example, the following code will load the json file, and inject it into the function as the parameter data.

@load_from.json(path=source("raw_data_path"), inject_="data")
def raw_data(data: dict, valid_keys: List[str]) -> dict:
    return [item for item in data if item in valid_keys]

This is a highly pluggable functionality – here’s the basics of how it works:

1. Every “key” (json above, but others include csv, literal, file, pickle, etc…) corresponds to a set of loader classes. For example, the json key corresponds to the JSONLoader class in default_data_loaders. They implement the classmethod name. Once they are registered with the central registry they pick

2. Every data loader class (which are all dataclasses) implements the load_targets method, which returns a list of types it can load to. For example, the JSONLoader class can load data of type dict. Note that the set of potential loading candidate classes are evaluated in reverse order, so the most recently registered loader class is the one that is used. That way, you can register custom ones.

3. The loader class is instantiated with the kwargs passed to the decorator. For example, the JSONLoader class takes a path kwarg, which is the path to the JSON file.

4. The decorator then creates a node that loads the data, and modifies the node that runs the function to accept that. It also returns metadata (customizable at the loader-class-level) to enable debugging after the fact. This is unstructured, but can be used down the line to describe any metadata to help debug.

The “core” hamilton library contains a few basic data loaders that can be implemented within the confines of python’s standard library. pandas_extensions contains a few more that require pandas to be installed.

Note that these can have default arguments, specified by defaults in the dataclass fields. For the full set of “keys” and “types” (e.g. load_from.json, etc…), look for all classes that inherit from DataLoader in the hamilton library. We plan to improve documentation shortly to make this discoverable.

__init__()#

@save_to#

class hamilton.function_modifiers.save_to#

Decorator that outputs data to some external source. You can think about this as the inverse of load_from.

This decorates a function, takes the final node produced by that function and then appends an additional node that saves the output of that function.

As the load_from decorator does, this decorator can be referred to in a dynamic way. For instance, @save_to.json will save the output of the function to a json file. Note that this means that the output of the function must be a dictionary (or subclass thereof), otherwise the decorator will fail.

Looking at the json example:

@save_to.json(path=source("raw_data_path"), output_name_="data_save_output")
def final_output(data: dict, valid_keys: List[str]) -> dict:
    return [item for item in data if item in valid_keys]

This adds a final node to the DAG with the name “data_save_output” that accepts the output of the function “final_output” and saves it to a json. In this case, the JSONSaver accepts a path parameter, which is provided by the upstream node (or input) named “raw_data_path”. The artifact_ parameter then says how to refer to the output of this node in the DAG.

If you called this with the driver:

dr = driver.Driver(my_module)
output = dr.execute(['final_output'], {'raw_data_path': '/path/my_data.json'})

You would just get the final result, and nothing would be saved.

If you called this with the driver:

dr = driver.Driver(my_module)
output = dr.execute(['data_save_output'], {'raw_data_path': '/path/my_data.json'})

You would get a dictionary of metadata (about the saving output), and the final result would be saved to a path.

Note that you can also hardcode the path, rather than using a dependency:

@save_to.json(path=value('/path/my_data.json'), output_name_="data_save_output")
def final_output(data: dict, valid_keys: List[str]) -> dict:
    return [item for item in data if item in valid_keys]

For a list of available “keys” (E.G. json), you currently have to look at the classes that implement DataSaver. In the future, this will be more discoverable with documentation.

__init__()#