Data Flow Tracking for AI Agents

Daita's Lineage Plugin gives AI agents the ability to automatically track, query, and visualize data flows, capturing SQL dependencies, function transforms, and pipeline steps into a queryable, impact aware graph.

Daita Team

February 24, 2026

When an agent modifies data, a reasonable question follows: "where did that data come from, and where does it go next?" In a simple, single table script this is obvious. In a production data stack with dozens of tables, ETL pipelines, API feeds, and transformation functions, the answer is rarely obvious and getting it wrong is expensive.

Data lineage is the discipline of tracking those flows: understanding which systems feed which downstream consumers, what transformations are applied along the way, and critically, what breaks when something upstream changes. It is well understood in traditional data engineering tools. But for AI agents, it has been largely absent.

Our Lineage Plugin brings data lineage directly into the agent layer. It gives agents and the developers building them three distinct ways to register data flows:

  • Graph based query system for tracing those flows in any direction
  • Automated SQL parsing that captures dependencies without any manual annotation
  • Impact analysis engine that tells you exactly what is at risk when you are about to change something

This post covers the full developer API and how to register flows, how to query them, how impact analysis works, and how to export the graph for visualization.


The Problem: Data Flows Without a Map

Modern data systems have a lineage problem even without agents involved. A table gets populated by an ETL job, that job reads from three upstream tables, one of those tables is sourced from an API, and the final output feeds four different downstream reports. When someone proposes a schema change to one of the upstream tables, it takes significant manual investigation to understand what will break.

Add AI agents to this picture and the problem compounds. Agents execute queries, run transformations, write to targets, and trigger downstream processes (often autonomously), at high frequency, across complex pipelines. Without automatic lineage capture, the graph of data dependencies is invisible.

The consequences are familiar to anyone who has managed a data platform:

  • Blind changes: Schema modifications are made without knowing which consumers depend on the changed fields.
  • Silent failures: A pipeline breaks because an upstream dependency changed and nothing tracked the relationship.
  • Manual archaeology: Engineers spend hours tracing data flow by reading code, querying job logs, and asking colleagues.
  • Compliance gaps: Regulatory frameworks like GDPR and HIPAA require organizations to know where data originates and where it flows. Without lineage, this is impossible to demonstrate.

The Lineage Plugin solves this at the framework level. Lineage is captured automatically during normal agent operation, stored in a queryable graph, and made available for both programmatic queries and LLM driven analysis.


Architecture Overview

The Lineage Plugin operates as a directed graph where nodes are data entities and edges are flows between them. Every registered flow adds or updates two nodes and one directed edge in the graph.

┌──────────────────────────────────────────────────────────────────┐
│                          Agent Session                           │
│                                                                  │
│  Four registration patterns:                                     │
│  • register_flow(source_id, target_id, ...)   — manual           │
│  • capture_sql_lineage(sql, ...)              — SQL auto-capture │
│  • @lineage.track(source=..., target=...)     — decorator        │
│  • register_pipeline(name, steps, ...)        — multi-step ETL   │
│                                                                  │
│  Query and analysis APIs:                                        │
│  • trace_lineage(entity_id, direction)        — graph traversal  │
│  • analyze_impact(entity_id, change_type)     — blast radius     │
│  • export_lineage(entity_id, format)          — visualization    │
│  • prune_stale_lineage(max_age_hours)         — graph hygiene    │
└──────────────────────────────────────────────────────────────────┘
                              │
                              ▼
              ┌───────────────────────────────┐
              │         Graph Backend         │
              │                               │
              │  Local dev: Local             │
              │  Production: Dynamo           │
              │  Standalone: in-memory        │
              └───────────────────────────────┘

The graph backend is selected automatically based on the runtime environment. In local development it uses a local graph store. In production (after daita push) it upgrades to a managed backend. In standalone mode, no backend configured. Flows are tracked in memory for the duration of the session. No configuration changes are required when moving between environments.

Because the plugin integrates directly with the agent lifecycle, lineage is captured within the same execution context where data flows occur. There is no separate tracking process to maintain.


Entity IDs: Naming the Nodes

Every node in the lineage graph is identified by an entity ID. The convention is type:name:

table:orders
table:raw_orders
table:public.customers
api:stripe_payments
api:github_events

The prefix before the colon declares the node type. The Lineage Plugin uses this prefix to infer node type automatically, so table:orders becomes a TABLE node and api:stripe_payments becomes an API node. You can use any prefix that makes sense for your system, the convention is flexible.

Entity IDs appear consistently throughout the API: as source and target in register_flow, as the root entity in trace_lineage, and as the subject of analyze_impact. Adopting a consistent naming scheme across your project pays dividends when the graph grows large.


Getting Started

from daita import Agent
from daita.plugins import lineage

# Create a lineage tracker
lineage_plugin = lineage()

# Attach it to an agent
agent = Agent(name="ETL Agent", model="claude-sonnet-4-6")
agent.add_plugin(lineage_plugin)

await agent.start()

The plugin is also available as a standalone object for use outside of agents, in scripts, migration tools, or data pipelines that are not LLM-driven:

from daita.plugins import lineage

tracker = lineage()

All four registration patterns and all query APIs are available in both contexts.


Four Ways to Register Lineage

1. Manual Flow Registration

register_flow is the foundation. It records a directed relationship between any two entities with a specified flow type and optional metadata.

# A basic ETL flow
await lineage_plugin.register_flow(
    source_id="table:raw_orders",
    target_id="table:orders",
    flow_type="TRANSFORMS",
    transformation="Normalize order amounts, apply tax rates, deduplicate on order_id"
)

# An API-to-table feed
await lineage_plugin.register_flow(
    source_id="api:stripe_payments",
    target_id="table:transactions",
    flow_type="SYNCS_TO",
    schedule="0 * * * *",  # cron: hourly
    transformation="Stripe webhook events ingested and mapped to transaction schema"
)

# A trigger relationship
await lineage_plugin.register_flow(
    source_id="table:orders",
    target_id="table:fulfillment_queue",
    flow_type="TRIGGERS",
    metadata={"trigger_condition": "status = 'paid'"}
)

Supported flow types:

Flow TypeUse Case
FLOWS_TOGeneric data movement, default when type is not specified
TRANSFORMSETL operations, data normalization, enrichment
SYNCS_TOReplication, data synchronization, mirroring
TRIGGERSEvent-driven relationships, reactive pipelines

The plugin also accepts natural language variants of these types, useful when the flow type comes from LLM generated output. "aggregate", "enrich", "etl" all resolve to TRANSFORMS. "replicate" resolves to SYNCS_TO. The normalization is handled transparently.

register_flow returns the generated flow ID:

result = await lineage_plugin.register_flow(
    source_id="table:raw_events",
    target_id="table:session_events",
    flow_type="TRANSFORMS"
)
# result["flow_id"]  ->  "flow:table:raw_events:table:session_events:2026-02-23T..."
# result["success"]  ->  True

2. SQL Auto Capture

For agents and scripts that execute SQL, capture_sql_lineage eliminates the need to manually annotate every query. It parses the SQL string, extracts source and target tables automatically, and registers the appropriate flows.

sql = """
    INSERT INTO orders
    SELECT o.*, c.segment
    FROM raw_orders o
    JOIN customer_segments c ON o.customer_id = c.customer_id
"""

result = await lineage_plugin.capture_sql_lineage(
    sql=sql,
    transformation="Enrich raw orders with customer segmentation data"
)

# result["source_tables"]  ->  ["raw_orders", "customer_segments"]
# result["target_tables"]  ->  ["orders"]
# result["flow_count"]     ->  2  (one flow per source-target pair)

The parser handles the full range of SQL patterns in practice:

  • SELECT ... FROM and JOIN clauses for source tables
  • INSERT INTO and UPDATE for target tables
  • CREATE TABLE ... AS SELECT for derived table creation
  • WITH ... AS (CTEs) — CTE names are recognized as temporary and excluded from the registered source tables

If you know the target table context ahead of time but the SQL does not include an explicit INSERT INTO, you can pass context_table to provide it:

# A raw SELECT used to populate a staging table
await lineage_plugin.capture_sql_lineage(
    sql="SELECT * FROM source_events WHERE event_date > '2026-01-01'",
    context_table="staging_events"
)

For cases where you want to inspect what the parser finds without registering flows, parse_sql_lineage returns the parsed result directly:

parsed = lineage_plugin.parse_sql_lineage(sql)
# {
#   "source_tables": ["raw_orders", "customer_segments"],
#   "target_tables": ["orders"],
#   "is_read_only": False,
#   "cte_tables": []
# }

This is useful for dry runs, validation logic, or building custom registration behavior on top of the parser.


3. Decorator-Based Tracking

For Python functions that transform data, the @track decorator captures lineage automatically on every invocation — no manual register_flow call inside the function body.

@lineage_plugin.track(
    source="table:raw_data",
    target="table:processed_data",
    transformation="Apply business rules, standardize field formats"
)
async def transform_data(df):
    # Normal function logic — lineage is captured before execution
    df["amount"] = df["amount"].round(2)
    df["currency"] = df["currency"].str.upper()
    return df

# Every call to transform_data() registers the flow automatically
result = await transform_data(input_df)

The decorator requires async functions. This is intentional: lineage registration is an I/O operation and the decorator awaits it before the function body runs. If you need to track a synchronous function, call register_flow explicitly after execution instead.

The decorator records the function name and module in the flow metadata, making it possible to trace which code path produced a given flow when debugging.


4. Pipeline Registration

For multi step ETL processes with a defined sequence of transformations, register_pipeline registers the entire pipeline in a single call, creating one flow per step with shared pipeline metadata.

await lineage_plugin.register_pipeline(
    name="customer_360_pipeline",
    steps=[
        {
            "source_id": "api:crm_api",
            "target_id": "table:raw_customers",
            "transformation": "Ingest CRM export and land to staging"
        },
        {
            "source_id": "table:raw_customers",
            "target_id": "table:customers_clean",
            "transformation": "Deduplicate, validate emails, normalize addresses"
        },
        {
            "source_id": "table:customers_clean",
            "target_id": "table:customer_360",
            "transformation": "Join with transaction history and support tickets"
        }
    ],
    schedule="0 6 * * *"  # Daily at 6am
)

# result["pipeline_name"]    ->  "customer_360_pipeline"
# result["steps_registered"] ->  3

Each step becomes a PIPELINE_STEP flow with the pipeline name embedded in its metadata. This makes it straightforward to later trace which pipeline a specific table-to-table relationship belongs to.


Querying the Graph

Once flows are registered, the graph can be queried in three modes: full lineage (both directions), upstream only, or downstream only.

Full Lineage Trace

trace_lineage traverses the graph in both directions from a starting entity, up to a configurable depth:

result = await lineage_plugin.trace_lineage(
    entity_id="table:orders",
    direction="both",  # "upstream", "downstream", or "both"
    max_depth=5
)

# result["lineage"]["upstream"]    ->  list of upstream entities
# result["lineage"]["downstream"]  ->  list of downstream entities
# result["upstream_count"]         ->  integer
# result["downstream_count"]       ->  integer

Each entity in the result includes:

{
    "entity_id": "table:raw_orders",
    "flow_type": "TRANSFORMS",
    "transformation": "Normalize order amounts, apply tax rates",
    "depth": 1  # hops from the root entity
}

The depth field tells you how many steps removed the entity is from your starting point. Depth 1 entities are directly connected; depth 2 entities are one step further, and so on. Cycle detection is built in — the traversal will not loop infinitely if your graph contains circular dependencies.

Focused Directional Queries

When you only need one direction, the focused variants are more efficient:

# Where does table:orders get its data from?
upstream = await lineage_plugin.trace_lineage(
    entity_id="table:orders",
    direction="upstream",
    max_depth=3
)

# What systems depend on table:orders?
downstream = await lineage_plugin.trace_lineage(
    entity_id="table:orders",
    direction="downstream",
    max_depth=3
)

These are also available as first class LLM tools when the plugin is attached to an agent (covered in the agent tools section below).


Impact Analysis

analyze_impact answers the question: "If I change this, what breaks?"

It walks the downstream graph from the target entity, counts all affected nodes, and assigns a risk level based on the breadth of impact.

result = await lineage_plugin.analyze_impact(
    entity_id="table:customers_clean",
    change_type="schema_change"
)

# {
#   "success": True,
#   "entity_id": "table:customers_clean",
#   "change_type": "schema_change",
#   "directly_affected_count": 3,
#   "total_affected_count": 14,
#   "affected_entities": [...],
#   "risk_level": "HIGH",
#   "recommendation": "High-risk schema_change. Review all affected systems, create migration plan, and notify stakeholders."
# }

Supported change types:

Change TypeDescription
schema_changeColumn added, removed, renamed, or retyped
deprecationEntity being sunsetted; consumers need migration paths
deletionEntity will be removed entirely
data_qualityData in this entity may be corrupt or unreliable

Risk levels are determined by the total downstream count. The thresholds can be configured at plugin creation time:

# Custom thresholds (defaults: HIGH > 20, MEDIUM > 5)
tracker = lineage(risk_thresholds={"HIGH": 10, "MEDIUM": 3})

When a graph backend is available (local or cloud), impact scoring uses cumulative edge impact weights from the graph algorithms, which accounts for the structural weight of each relationship, not just the raw count. When running in standalone in memory mode, the count based fallback is used.

The recommendation field provides a plain English guidance string derived from the risk level and change type, ready to surface directly in agent output or dashboards without further processing.


Visualization Export

export_lineage generates a diagram of the lineage graph rooted at a given entity. Two formats are supported: Mermaid (the default, renders natively in GitHub, Notion, and most documentation tools) and DOT (Graphviz, for environments with richer graph rendering).

# Mermaid diagram
result = await lineage_plugin.export_lineage(
    entity_id="table:orders",
    format="mermaid",
    direction="both"
)

print(result["diagram"])

Output:

graph LR
    table_orders["table:orders"]
    table_raw_orders["table:raw_orders"] -->|TRANSFORMS| table_orders
    table_customer_segments["table:customer_segments"] -->|TRANSFORMS| table_orders
    table_orders -->|TRIGGERS| table_fulfillment_queue["table:fulfillment_queue"]
    table_orders -->|FLOWS_TO| table_order_summary["table:order_summary"]

This diagram renders directly in any Mermaid compatible viewer. For DOT format:

result = await lineage_plugin.export_lineage(
    entity_id="table:orders",
    format="dot",
    direction="upstream"  # upstream only
)

The direction parameter controls which side of the graph is included. Use "upstream" to show data origins, "downstream" to show consumers, or "both" for the full neighborhood.


Keeping the Graph Clean: Stale Lineage Pruning

In long-running systems, the lineage graph can accumulate "ghost" entries: tables, APIs, or flows that no longer exist in the source system but were never explicitly removed. The prune_stale_lineage agent tool handles this by evicting entries that have not been refreshed within a specified age window.

The typical pattern is to attach the plugin to an agent, run a full pipeline scan registering all current flows, and instruct the agent to prune at the end. Anything not touched during the scan is considered stale:

from daita import Agent
from daita.plugins import lineage

tracker = lineage()

agent = Agent(
    name="Pipeline-Scanner",
    model="claude-sonnet-4-6",
    prompt="""You are a data pipeline maintenance agent.
    After registering all current pipeline flows, call prune_stale_lineage
    to remove any entries older than 48 hours that were not refreshed
    during this scan run.""",
    tools=[tracker]
)

await agent.start()

result = await agent.run("""
    Register these active flows and then prune stale entries:
    - table:raw_orders -> table:orders_clean (TRANSFORMS, daily ETL normalization)
    - table:orders_clean -> table:revenue_daily (TRANSFORMS, daily revenue rollup)
    - api:stripe_api -> table:raw_transactions (SYNCS_TO, hourly Stripe ingest)

    After registering all flows, prune anything not touched in the last 48 hours.
""")

print(result)
await agent.stop()

The agent calls register_flow for each active pipeline, then calls prune_stale_lineage with max_age_hours=48. Any nodes or edges not refreshed during that window (ghost entries from renamed or deprecated tables) are removed from the graph. Setting max_age_hours=0 removes everything not touched in the current run, useful when you are confident the scan is complete and canonical.


Backends: Standalone, Local, and Cloud

The Lineage Plugin operates across three backend modes without any code changes.

Standalone (In Memory)

When no backend is configured, flows are tracked in memory for the current session only. Useful for scripts and one off analyses where persistence is not required.

tracker = lineage()  # In memory, no persistence

Local Development

When the plugin is attached to an agent and initialized locally, it automatically uses a local graph store that persists to disk. The graph survives across agent runs and can be queried after the session ends.

Cloud (Production)

After daita push, the plugin upgrades automatically to a managed graph backend with O(log N) query performance, concurrent write safety, and shared access across agents. Multiple agents can write to and read from the same lineage graph simultaneously, enabling multi agent data pipelines where each agent's work is visible to the others.

In all three modes, the developer API is identical. Backend selection is automatic based on the runtime environment, no configuration required.


The Agent Tool Interface

When the Lineage Plugin is attached to an agent, all lineage operations are automatically exposed as LLM callable tools. This means your agent can reason about data lineage autonomously: tracing flows, assessing impact, and registering new relationships without any explicit function calls in your application code.

The available tools are:

Tool NameWhat the LLM Can Do
trace_lineageTrace upstream and downstream from any entity
trace_upstreamFind all data sources for a given entity
trace_downstreamFind all consumers of a given entity
register_flowRecord a data flow between two entities
register_pipelineRegister a multi step pipeline as a sequence of flows
analyze_impactAssess blast radius before making a change
export_lineageGenerate a Mermaid or DOT diagram for a given entity
prune_stale_lineageRemove ghost entries after a full scan

This enables a class of agents that actively manage data lineage as part of their workflow:

from daita import Agent
from daita.plugins import lineage

agent = Agent(
    name="Data Steward",
    model="claude-sonnet-4-6",
    prompt="""
    You are a data steward responsible for maintaining an accurate
    data lineage graph. When asked about data flows, trace the lineage
    and provide a clear explanation. When asked about a proposed change,
    run an impact analysis before approving it.
    """
)
agent.add_plugin(lineage())

await agent.start()

# The agent can now trace lineage, assess impact, and register flows
answer = await agent.run(
    "We want to rename the customer_id column in table:customers_clean. "
    "What systems would be affected and how serious is the risk?"
)
print(answer)

The agent will call analyze_impact on table:customers_clean, then trace_downstream to enumerate the consumers, and return a structured risk assessment all autonomously, without any glue code.


Why This Matters for Data Engineering and Agent Systems

Data lineage is one of those capabilities that feels optional until it isn't. The first time a schema change breaks a production pipeline that no one knew was downstream, or the first time a compliance audit requires documentation of where sensitive data flows, the absence of lineage tracking becomes an urgent problem.

For agentic systems specifically, there are additional dimensions:

Agents make changes at speed. A human engineer making a schema change will at least pause to think about dependencies. An agent executing autonomously may not have that context. Automatic lineage capture and built in impact analysis give the agent the information it needs to reason about consequences before acting.

Pipelines grow without documentation. As agents are added to a data platform, they introduce new flows that may not be tracked anywhere. The Lineage Plugin captures these automatically, building an accurate, up to date map of what is actually happening, not what was originally planned.

Debugging requires understanding. When a downstream table contains bad data, tracing the lineage back to the source is the first step in debugging. With a queryable graph, this becomes a single trace_lineage call rather than hours of manual investigation.

Compliance is built in. Data residency and data flow requirements are increasingly common. A continuously maintained lineage graph provides the foundation for demonstrating compliance without building a separate tracking system.


Conclusion

The Lineage Plugin brings production grade data lineage tracking into the agent layer, where data flows are actually happening. Rather than treating lineage as a separate concern maintained by a separate team, it makes lineage capture a natural byproduct of normal agent operation.

The four registration patterns: manual flows, SQL auto-capture, decorator tracking, and pipeline registration cover the full range of data movement patterns without requiring a single approach. The query APIs make the graph immediately useful for debugging, impact assessment, and visualization. And because the plugin operates across in memory, local, and cloud backends with no configuration changes, it fits equally well into a local development script and a production multi agent data platform.

Daita Logo

Production AI agent framework with zero-config observability and managed cloud deployment.

© 2026 Daita Corp. All rights reserved.