Airflow¶

For more details see this Hamilton + Airflow blog post.

TL;DR:

  1. Hamilton complements Airflow. It’ll help you write better, more modular, and testable code.

  2. Hamilton does not replace Airflow.

High-level differences:¶

  • Hamilton is a micro-orchestator. Airflow is a macro-orchestrator.

  • Hamilton is a Python library standardizing how you express python pipelines, while Airflow is a complete platform and system for scheduling and executing pipelines.

  • Hamilton focuses on providing a lightweight, low dependency, flexible way to define data pipelines as Python functions, whereas Airflow is a whole system that comes with a web-based UI, scheduler, and executor.

  • Hamilton pipelines are defined using pure Python code, that can be run anywhere that Python runs. While Airflow uses Python to describe a DAG, this DAG can only be run by the Airflow system.

  • Hamilton complements Airflow, and you can use Hamilton within Airflow. But the reverse is not true.

  • You can use Hamilton directly in a Jupyter Notebook, or Python web-service. You can’t do this with Airflow.

Code examples:¶

Looking at the two examples below, you can see that Hamilton is a more lightweight and flexible way to define data pipelines. There is no scheduling information, etc required to run the code because Hamilton runs the pipeline in the same process as the caller. This makes it easier to test and debug pipelines. Airflow, on the other hand, is a complete system for scheduling and executing pipelines. It is more complex to set up and run. Note: If you stuck the contents of run.py in a function within the example_dag.py, the Hamilton pipeline could be used in the Airflow PythonOperator!

Hamilton:¶

The below code here shows how you can define a simple data pipeline using Hamilton. The pipeline consists of three functions that are executed in sequence. The pipeline is defined in a module called pipeline.py, and then executed in a separate script called run.py, which imports the pipeline module and executes it.

# pipeline.py
def raw_data() -> list[int]:
    return [1, 2, 3]

def processed_data(raw_data: list[int]) -> list[int]:
    return [x * 2 for x in data]

def load_data(process_data: list[int], client: SomeClient) -> dict:
    metadata = client.send_data(process_data)
    return metadata

# run.py -- this is the script that executes the pipeline
import pipeline
from hamilton import driver
dr = driver.Builder().with_modules(pipeline).build()
metadata = dr.execute(['load_data'], inputs=dict(client=SomeClient()))

Airflow:¶

The below code shows how you can define the same pipeline using Airflow. The pipeline consists of three tasks that are executed in sequence. The entire pipeline is defined in a module called example_dag.py, and then executed by the Airflow scheduler.

# example_dag.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple DAG',
    schedule_interval=timedelta(days=1),
)

def extract_data():
    return [1, 2, 3]

def transform_data(data):
    return [x * 2 for x in data]

def load_data(data):
    client = SomeClient()
    client.send_data(data)

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    op_args=['{{ ti.xcom_pull(task_ids="extract_data") }}'],
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    op_args=['{{ ti.xcom_pull(task_ids="transform_data") }}'],
    dag=dag,
)

extract_task >> transform_task >> load_task