Materialization¶

So far, we executed our dataflow using the Driver.execute() method, which can receive an inputs dictionary and return a results dictionary (by default). However, you can also execute code with Driver.materialize() to directly read from / write to external data sources (file, database, cloud data store).

On this page, you’ll learn:

  • The difference between .execute() and .materialize()

  • Why use materialization

  • What are DataSaver and DataLoader objects

  • The basics to write your own materializer

Different ways to write the same dataflow¶

Below are 3 ways to write a dataflow that:

  1. loads a dataframe from a parquet file

  2. preprocesses the dataframe

  3. trains a machine learning model

  4. saves the trained model

The first two options use Driver.execute() and the latter Driver.materialize(). Notice where in the code data is loaded and saved and how it affects the dataflow.

Model training¶

Nodes / dataflow context

Driver context

Materialization

import pandas as pd
import xgboost


def raw_df(data_path: str) -> pd.DataFrame:
    """Load raw data from parquet file"""
    return pd.read_parquet(data_path)


def preprocessed_df(raw_df: pd.DataFrame) -> pd.DataFrame:
    """preprocess raw data"""
    return ...


def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
    """Train model on preprocessed data"""
    return ...


def save_model(model: xgboost.XGBModel, model_dir: str) -> None:
    """Save trained model to JSON format"""
    model.save_model(f"{model_dir}/model.json")


if __name__ == "__main__":
    import __main__

    from hamilton import driver

    dr = driver.Builder().with_modules(__main__).build()

    data_path = "..."
    model_dir = "..."
    inputs = dict(data_path=data_path, model_dir=model_dir)
    final_vars = ["save_model"]

    results = dr.execute(final_vars, inputs=inputs)
    # results["save_model"] == None
import pandas as pd
import xgboost


def preprocessed_df(raw_df: pd.DataFrame) -> pd.DataFrame:
    """preprocess raw data"""
    return ...


def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
    """Train model on preprocessed data"""
    return ...


if __name__ == "__main__":
    import __main__

    from hamilton import driver

    dr = driver.Builder().with_modules(__main__).build()

    data_path = "..."
    model_dir = "..."
    inputs = dict(raw_df=pd.read_parquet(data_path))
    final_vars = ["model"]

    results = dr.execute(final_vars, inputs=inputs)
    results["model"].save_model(f"{model_dir}/model.json")
import pandas as pd
import xgboost


def preprocessed_df(raw_df: pd.DataFrame) -> pd.DataFrame:
    """preprocess raw data"""
    return ...


def model(preprocessed_df: pd.DataFrame) -> xgboost.XGBModel:
    """Train model on preprocessed data"""
    return ...


if __name__ == "__main__":
    import __main__

    from hamilton import driver
    from hamilton.io.materialization import from_, to

    # this registers DataSaver and DataLoader objects
    from hamilton.plugins import pandas_extensions, xgboost_extensions  # noqa: F401

    dr = driver.Builder().with_modules(__main__).build()

    data_path = "..."
    model_dir = "..."
    materializers = [
        from_.parquet(path=data_path, target="raw_df"),
        to.json(path=f"{model_dir}/model.json", dependencies=["model"], id="model__json"),
    ]

    dr.materialize(*materializers)
../../_images/node_ctx.png ../../_images/driver_ctx.png ../../_images/materializer_ctx.png

As explained previously, Driver.execute() walks the graph to compute the list of nodes you requested by name. For Driver.materialize(), you give it a list of data savers (from_) and data loaders (to). Each one will add a node to the dataflow before execution.

Note

Driver.materialize() can do everything Driver.execute() does, and more. It can receive inputs and overrides. Instead of using final_vars, you can use additional_vars to request nodes that you don’t want to materialize/save.

Why use materialization¶

Let’s compare the benefits of the 3 different approaches

Nodes / dataflow context¶

This approach defines data loading and saving as part of the dataflow and uses Driver.execute(). It is usually the simplest approach and the one you should start with.

Benefits

  • the functions raw_df() and save_model() are transparent as to how they load/save data

  • can easily change data location using the strings data_path and model_dir as inputs

  • all operations are part of the dataflow

Limitations

  • need to write a unique function for each loaded parquet file and saved model. To reduce code duplication, one could write a utility function _load_parquet()

  • can be too restrictive as to how to load data. Using override in the .execute() call can add flexibility.

Driver context¶

This approach loads and saves data outside the dataflow and uses Driver.execute(). Since the Driver is responsible for executing your dataflow, it makes sense to handle data loading/saving in the context of the “driver code” (e.g., run.py) if they change often.

Benefits

  • Driver users is responsible for loading/saving data

  • fewer dataflow functions to define and maintain

  • the functions for raw_df() and save_model() can live in another Python module that you can optionally build the Driver with.

Limitations

  • add complexity to the “driver code”.

  • lose the benefits of Hamilton for loading and saving operations (visualize, lifecycle hook, etc.)

  • to add flexibility to data loading/saving, one can adopt the nodes/dataflow context approach and add functions with @config for alternative implementations (see Select functions to include).

Materialization¶

This approach tries to strike a balance between the two previous methods and uses Driver.materialize().

Unique benefits

  • Use the Hamilton logic to combine nodes (more on that later)

  • Get tested code for common data loading and saving out-of-the-box (e.g., JSON, CSV, Parquet, pickle)

  • Easily save the same node to multiple formats

Benefits

  • Flexibility for Driver users to change data location

  • Less dataflow functions to define and maintain

  • All operations are part of the dataflow

Limitations

  • Writing a custom DataSaver or DataLoader requires more effort than adding a function to the dataflow.

  • Adds some complexity to the Driver (e.g., run.py).

DataLoader and DataSaver¶

In Hamilton, DataLoader and DataSaver are classes that define how to load or save a particular data format. Calling Driver.materialize(DataLoader(), DataSaver()) adds nodes to the dataflow (see visualizations above).

Here are simplified snippets for saving and loading an XGBoost model to/from JSON.

DataLoader

DataSaver

import dataclasses
from os import PathLike
from typing import Any, Collection, Dict, Tuple, Type, Union

import xgboost

from hamilton.io import utils
from hamilton.io.data_adapters import DataLoader


@dataclasses.dataclass
class XGBoostJsonReader(DataLoader):
    path: Union[str, bytearray, PathLike]

    @classmethod
    def applicable_types(cls) -> Collection[Type]:
        return [xgboost.XGBModel]

    def load_data(self, type_: Type) -> Tuple[xgboost.XGBModel, Dict[str, Any]]:
        # uses the XGBoost library
        model = type_()
        model.load_model(self.path)
        metadata = utils.get_file_metadata(self.path)
        return model, metadata

    @classmethod
    def name(cls) -> str:
        return "json"  # the name for `from_.{name}`
import dataclasses
from os import PathLike
from typing import Any, Collection, Dict, Type, Union

import xgboost

from hamilton.io import utils
from hamilton.io.data_adapters import DataSaver


@dataclasses.dataclass
class XGBoostJsonWriter(DataSaver):
    path: Union[str, PathLike]

    @classmethod
    def applicable_types(cls) -> Collection[Type]:
        return [xgboost.XGBModel]

    def save_data(self, data: xgboost.XGBModel) -> Dict[str, Any]:
        # uses the XGBoost library
        data.save_model(self.path)
        return utils.get_file_metadata(self.path)

    @classmethod
    def name(cls) -> str:
        return "json"  # the name for `to.{name}`

To define your own DataSaver and DataLoader, the Hamilton XGBoost extension provides a good example

@load_from and @save_to¶

Also, the data loaders and savers power the @load_from and @save_to Load and save external data