plugins.h_ddog.DDOGTracer

class hamilton.plugins.h_ddog.DDOGTracer(root_name: str, include_causal_links: bool = False, service: str = None)

Lifecycle adapter to use datadog to run tracing on node execution. This works with the following execution environments: 1. Vanilla Hamilton – no task-based computation, just nodes 2. Task-based, synchronous 3. Task-based with Multithreading, Ray, and Dask It will likely work with others, although we have not yet tested them. This does not work with async (yet).

Note that this is not a typical use of Datadog if you’re not using hamilton for a microservice. It does work quite nicely, however! Monitoring ETLs is not a typical datadog case (you can’t see relationships between nodes/tasks or data summaries), but it is easy enough to work with and gives some basic information.

This tracer bypasses context management so we can more accurately track relationships between nodes/tags. Also, we plan to get this working with OpenTelemetry, and use that for datadog integration.

To use this, you’ll want to run pip install sf-hamilton[ddog] (or pip install “sf-hamilton[ddog]” if using zsh)

__init__(root_name: str, include_causal_links: bool = False, service: str = None)

Creates a DDOGTracer. This has the option to specify some parameters.

Parameters:
  • root_name – Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span.

  • include_causal_links – Whether or not to include span causal links. Note that there are some edge-cases here, and This is in beta for datadog, and actually broken in the current client, but it has been fixed and will be released shortly: https://github.com/DataDog/dd-trace-py/issues/8049. Furthermore, the query on datadog is slow for displaying causal links. We’ve disabled this by default, but feel free to test it out – its likely they’ll be improving the docum

  • service – Service name – will pick it up from the environment through DDOG if not available.

post_graph_execute(*, run_id: str, graph: FunctionGraph, success: bool, error: Exception | None, results: Dict[str, Any] | None)

Just delegates to the interface method, passing in the right data.

post_node_execute(*, run_id: str, node_: Node, kwargs: Dict[str, Any], success: bool, error: Exception | None, result: Any | None, task_id: str | None = None)

Wraps the after_execution method, providing a bridge to an external-facing API. Do not override this!

post_task_execute(*, run_id: str, task_id: str, nodes: List[Node], results: Dict[str, Any] | None, success: bool, error: Exception)

Hook called immediately after task execution. Note that this is only useful in dynamic execution, although we reserve the right to add this back into the standard hamilton execution pattern.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • task_id – ID of the task

  • nodes – Nodes that were executed

  • results – Results of the task

  • success – Whether or not the task executed successfully

  • error – The error that was raised, if any

pre_graph_execute(*, run_id: str, graph: FunctionGraph, final_vars: List[str], inputs: Dict[str, Any], overrides: Dict[str, Any])

Implementation of the pre_graph_execute hook. This just converts the inputs to the format the user-facing hook is expecting – performing a walk of the DAG to pass in the set of nodes to execute. Delegates to the interface method.

pre_node_execute(*, run_id: str, node_: Node, kwargs: Dict[str, Any], task_id: str | None = None)

Wraps the before_execution method, providing a bridge to an external-facing API. Do not override this!

pre_task_execute(*, run_id: str, task_id: str, nodes: List[Node], inputs: Dict[str, Any], overrides: Dict[str, Any])

Hook that is called immediately prior to task execution. Note that this is only useful in dynamic execution, although we reserve the right to add this back into the standard hamilton execution pattern.

Parameters:
  • run_id – ID of the run, unique in scope of the driver.

  • task_id – ID of the task, unique in scope of the driver.

  • nodes – Nodes that are being executed

  • inputs – Inputs to the task

  • overrides – Overrides to task execution

run_after_graph_execution(*, error: Exception | None, run_id: str, **future_kwargs: Any)

Runs after graph execution. Garbage collects + finishes the root span.

Parameters:
  • error – Error the graph raised when running, if any

  • run_id – ID of the run

  • future_kwargs – reserved for future keyword arguments/backwards compatibility.

run_after_node_execution(*, node_name: str, error: Exception | None, task_id: str | None, run_id: str, **future_kwargs: Any)

Runs after a node’s execution – completes the span.

Parameters:
  • node_name – Name of the node

  • error – Error that the node raised, if any

  • task_id – Task ID that spawned the node

  • run_id – ID of the run.

  • future_kwargs – reserved for future keyword arguments/backwards compatibility.

run_after_task_execution(*, task_id: str, run_id: str, error: Exception, **future_kwargs)

Rusn after task execution. Finishes task-level spans.

Parameters:
  • task_id – ID of the task, ID of the run.

  • run_id – ID of the run

  • error – Error the graph raised when running, if any

  • future_kwargs – Future keyword arguments for backwards compatibility

run_before_graph_execution(*, run_id: str, **future_kwargs: Any)

Runs before graph execution – sets the state so future ones can reference it.

Parameters:
  • run_id – ID of the run

  • future_kwargs – reserved for future keyword arguments/backwards compatibility.

run_before_node_execution(*, node_name: str, node_kwargs: Dict[str, Any], node_tags: Dict[str, Any], task_id: str | None, run_id: str, **future_kwargs: Any)

Runs before a node’s execution. Sets up/stores spans.

Parameters:
  • node_name – Name of the node.

  • node_kwargs – Keyword arguments of the node.

  • node_tags – Tags of the node (they’ll get stored as datadog tags)

  • task_id – Task ID that spawned the node

  • run_id – ID of the run.

  • future_kwargs – reserved for future keyword arguments/backwards compatibility.

run_before_task_execution(*, task_id: str, run_id: str, **future_kwargs)

Runs before task execution. Sets up the task span.

Parameters:
  • task_id – ID of the task

  • run_id – ID of the run,

  • future_kwargs – reserved for future keyword arguments/backwards compatibility.