In the modern data stack, "Where did this data come from?" is the single most expensive question you can ask.
If you are a Data Engineer, you have lived this nightmare: A dashboard breaks. The CEO is asking why the revenue numbers are wrong. You spend the next 4 hours tracing a CSV export back to a Spark job, which reads from a View, which joins three tables, one of which hasn't updated in 48 hours.
This is the "Data Lineage" problem.
Traditional documentation fails because data relationships are not linear, they are a network. To solve this, we need to treat our infrastructure as a Graph.
In this engineering guide, based on research from Fujitsu's Infrastructure Guild, we will move beyond static diagrams. We will architect a Graph-Based Lineage Engine using Neo4j and Python to programmatically trace dependencies, find root causes, and clean up dead data.
Most data platforms are treated as isolated silos (S3 buckets, SQL tables, Airflow DAGs). We need to connect them.
The Concept:
Here is the schema we will implement programmatically:
First, let's create a reusable Python client to interact with our Graph. We aren't just writing queries; we are building an API for our data platform.
from neo4j import GraphDatabase class LineageGraph: def __init__(self, uri, user, password): self.driver = GraphDatabase.driver(uri, auth=(user, password)) def close(self): self.driver.close() def run_query(self, query, parameters=None): with self.driver.session() as session: result = session.run(query, parameters) return [record.data() for record in result] # Initialize connection graph_db = LineageGraph("bolt://localhost:7687", "neo4j", "your_password")
This is where the graph shines. If the Raw_Leads table is corrupted, what downstream dashboards are broken?
In a standard SQL database, this requires complex recursive joins. In Python + Cypher, it is a simple traversal.
The Code: We define a function that takes a table name and walks the graph forward (-[:TRANSFORMS_TO*]->) to find every dependent asset.
def get_downstream_impact(graph_client, table_name): """ Finds all assets (Views, Files, Dashboards) that depend on a specific table. """ cypher_query = """ MATCH (source:Table {name: $name})-[:TRANSFORMS_TO|READS_FROM*]->(downstream) RETURN DISTINCT downstream.name as asset_name, labels(downstream) as asset_type """ results = graph_client.run_query(cypher_query, parameters={"name": table_name}) print(f" Blast Radius for '{table_name}':") for record in results: print(f" → [{record['asset_type'][0]}] {record['asset_name']}") # Usage # get_downstream_impact(graph_db, "Raw_Leads")
Output:
Blast Radius for 'Raw_Leads': → [Table] Clean_Leads → [View] Regional_Sales_View → [Dashboard] Q3_Revenue_Report
When a report is wrong, you need to trace it backwards to the source. Who changed the code? Which ETL job touched it last?
The Code: We walk the graph in reverse (<-[…]) to find the upstream lineage and the owner responsible.
def trace_root_cause(graph_client, artifact_name): """ Traces backwards from a broken report to find the source tables and owners. """ cypher_query = """ MATCH (destination {name: $name})<-[:WRITES_TO|TRANSFORMS_TO*]-(upstream) OPTIONAL MATCH (upstream)<-[:OWNS]-(owner:User) RETURN upstream.name as source, upstream.type as type, owner.name as owner """ results = graph_client.run_query(cypher_query, parameters={"name": artifact_name}) print(f" Root Cause Trace for '{artifact_name}':") for record in results: owner = record['owner'] if record['owner'] else "Unknown" print(f" ← Modified by [{record['type']}] {record['source']} (Owner: {owner})") # Usage # trace_root_cause(graph_db, "Q3_Revenue_Report")
Scenario: This script might reveal that Q3RevenueReport reads from CleanLeads, which was updated by Job101, owned by Alice. You now know exactly who to Slack.
Organizations struggle to delete old data because they don't know who uses it. We can programmatically calculate the "Centrality" or "Popularity" of a table.
If a table has Zero incoming READS_FROM edges in the graph, it is an "Orphan."
The Code:
def find_unused_assets(graph_client): """ Identifies tables that have no downstream dependencies (Orphans). """ cypher_query = """ MATCH (t:Table) WHERE NOT (t)-[:TRANSFORMS_TO]->() AND NOT ()-[:READS_FROM]->(t) RETURN t.name as table_name, t.created_at as created_date """ results = graph_client.run_query(cypher_query) print("🗑️ Candidate Tables for Deletion (No Dependencies):") for record in results: print(f" - {record['table_name']} (Created: {record['created_date']})") # Usage # find_unused_assets(graph_db)
Developer Insight: You can hook this function into a Slack bot that runs every Monday: "Here are 5 tables that haven't been queried in 6 months. Delete them?"
By wrapping Cypher queries in Python, we move from "Manual Documentation" to "Programmable Lineage."
The ROI of this code:
Your Next Step: Export your information_schema and query logs, verify them with the Python scripts above, and finally see what your data platform actually looks like.
\



Copy linkX (Twitter)LinkedInFacebookEmail
Trump Media to distribute new digital tokens