Data Pipeline Monitoring: Catch Failures Before Users Do
Data Pipeline Monitoring: A Practical Guide to Catching Failures Before Your Stakeholders Do
Your Slack lights up at 7:42 AM: "The revenue dashboard is showing yesterday's numbers." You check Airflow — a task failed silently at 3 AM, no alert fired, and now the entire C-suite is looking at stale data. Sound familiar? Data pipeline monitoring is the discipline that prevents exactly this scenario — and most teams don't take it seriously until they've been burned.
This guide walks through concrete monitoring strategies, tooling, and code examples so you can build observability into your pipelines from day one.
Why Pipeline Monitoring Matters More Than You Think
Data pipelines are invisible infrastructure. When they work, nobody notices. When they break, everyone notices — usually hours too late.
The core problem: data pipelines fail silently. Unlike a web app that throws a 500 error to a user, a broken pipeline just... doesn't produce fresh data. The downstream dashboard still loads, the ML model still serves predictions — they're just wrong.
| Failure Type | Example | Typical Detection Time (No Monitoring) |
|---|---|---|
| Hard failure | Task throws an exception, DAG stops | Hours (next manual check) |
| Soft failure | Pipeline completes but produces 0 rows | Days (stakeholder complaint) |
| Data drift | Schema change upstream breaks joins | Days to weeks |
| Latency creep | Pipeline takes 4x longer gradually | Weeks (until SLA breach) |
| Silent corruption | Wrong values, duplicates sneak in | Weeks to months |
Hard failures are the easy ones. It's the soft failures — the pipeline that "succeeds" but produces garbage — that cause real damage.
The Four Pillars of Data Pipeline Monitoring
Effective monitoring covers four dimensions. Miss one and you have a blind spot.
1. Execution Monitoring (Did It Run?)
The baseline: track whether your pipeline ran, when it started, when it finished, and whether it succeeded.
Every orchestrator gives you this for free — Airflow, Dagster, Prefect all track task states. The mistake teams make is relying solely on the orchestrator UI and not routing these signals to an alerting system.
Here's a practical Airflow example using callbacks to push alerts:
# airflow_callbacks.py — Alerting on task failure via Slack webhook
# Compatible with: Apache Airflow 2.x
import requests
from airflow.models import Variable
from datetime import datetime
def slack_failure_alert(context):
"""Send a Slack alert when any task fails.
Attach this to on_failure_callback at the DAG or task level.
Requires an Airflow Variable 'slack_webhook_url'.
"""
webhook_url = Variable.get("slack_webhook_url")
task_instance = context.get("task_instance")
dag_id = task_instance.dag_id
task_id = task_instance.task_id
execution_date = context.get("execution_date").isoformat()
log_url = task_instance.log_url
payload = {
"text": (
f":rotating_light: *Pipeline Failure*\n"
f"*DAG:* `{dag_id}`\n"
f"*Task:* `{task_id}`\n"
f"*Execution Date:* {execution_date}\n"
f"*Logs:* <{log_url}|View Logs>"
)
}
requests.post(webhook_url, json=payload, timeout=10)
def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Alert when a task misses its SLA.
Attach to sla_miss_callback at the DAG level.
"""
webhook_url = Variable.get("slack_webhook_url")
task_names = ", ".join([t.task_id for t in task_list])
payload = {
"text": (
f":warning: *SLA Miss*\n"
f"*DAG:* `{dag.dag_id}`\n"
f"*Tasks:* `{task_names}`\n"
f"*Expected by:* {slas[0].execution_date.isoformat()}"
)
}
requests.post(webhook_url, json=payload, timeout=10)
Key lesson: SLA monitoring is more useful than failure monitoring. A task that retries 5 times and eventually succeeds at 11 AM instead of 6 AM is technically a "success" — but your stakeholders disagree.
2. Data Quality Monitoring (Is the Output Correct?)
This is where most teams have the biggest gap. The pipeline ran successfully — but did it produce good data?
At minimum, check these after every pipeline run:
- Row counts: Did we get a reasonable number of rows? (Not zero, not 10x the usual)
- Null rates: Are critical columns unexpectedly null?
- Uniqueness: Are primary keys actually unique?
- Freshness: Is the most recent timestamp within expected range?
- Value ranges: Are numeric values within plausible bounds?
Here's a practical SQL-based validation suite you can run as a post-pipeline step:
-- Data quality checks for a daily pipeline output
-- Dialect: PostgreSQL (works with minor tweaks on Spark SQL / BigQuery)
-- Check 1: Row count within expected range
WITH row_check AS (
SELECT
COUNT(*) AS row_count,
CASE
WHEN COUNT(*) = 0 THEN 'CRITICAL: Zero rows produced'
WHEN COUNT(*) < 1000 THEN 'WARNING: Unusually low row count'
WHEN COUNT(*) > 1000000 THEN 'WARNING: Unusually high row count'
ELSE 'OK'
END AS status
FROM analytics.daily_revenue
WHERE load_date = CURRENT_DATE
),
-- Check 2: Null rate on critical columns
null_check AS (
SELECT
ROUND(
100.0 * COUNT(*) FILTER (WHERE revenue IS NULL) / NULLIF(COUNT(*), 0),
2
) AS revenue_null_pct,
ROUND(
100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / NULLIF(COUNT(*), 0),
2
) AS customer_id_null_pct
FROM analytics.daily_revenue
WHERE load_date = CURRENT_DATE
),
-- Check 3: Freshness — most recent source timestamp
freshness_check AS (
SELECT
MAX(event_timestamp) AS latest_event,
CASE
WHEN MAX(event_timestamp) < NOW() - INTERVAL '24 hours'
THEN 'CRITICAL: Data older than 24h'
WHEN MAX(event_timestamp) < NOW() - INTERVAL '6 hours'
THEN 'WARNING: Data older than 6h'
ELSE 'OK'
END AS freshness_status
FROM analytics.daily_revenue
WHERE load_date = CURRENT_DATE
),
-- Check 4: Primary key uniqueness
pk_check AS (
SELECT
COUNT(*) AS total_rows,
COUNT(DISTINCT transaction_id) AS unique_keys,
CASE
WHEN COUNT(*) != COUNT(DISTINCT transaction_id)
THEN 'CRITICAL: Duplicate primary keys detected'
ELSE 'OK'
END AS uniqueness_status
FROM analytics.daily_revenue
WHERE load_date = CURRENT_DATE
)
SELECT
r.row_count,
r.status AS row_status,
n.revenue_null_pct,
n.customer_id_null_pct,
f.freshness_status,
p.uniqueness_status
FROM row_check r, null_check n, freshness_check f, pk_check p;
Adapt the thresholds to your actual data. Static thresholds work for v1 — you can graduate to anomaly detection (standard deviation from rolling average) later.
3. Performance Monitoring (Is It Fast Enough?)
Pipeline duration matters. A pipeline that takes 2 hours today and 6 hours next month will eventually breach your SLA — but nobody notices until it does.
Track these metrics over time:
| Metric | Why It Matters | Alert Threshold (Example) |
|---|---|---|
| Total DAG duration | SLA compliance | > 2x median of last 30 runs |
| Individual task duration | Isolate bottleneck | > 3x median for that task |
| Data volume processed | Capacity planning | > 2x average (investigate source) |
| Memory / CPU usage | Infrastructure cost | > 80% sustained |
| Queue wait time | Orchestrator capacity | > 15 min (scale workers) |
A simple approach: log task durations to a metrics table and build a dashboard. Here's a lightweight Python decorator for this:
# task_metrics.py — Log task execution time to a metrics table
# Compatible with: Python 3.9+, psycopg2
import time
import functools
import psycopg2
from datetime import datetime, timezone
def track_duration(dag_id: str, task_id: str, conn_string: str):
"""Decorator that logs task execution duration to a PostgreSQL metrics table.
Usage:
@track_duration("daily_revenue", "transform", CONN_STRING)
def transform_data():
...
Requires table:
CREATE TABLE pipeline_metrics (
dag_id TEXT,
task_id TEXT,
started_at TIMESTAMPTZ,
duration_seconds FLOAT,
status TEXT
);
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.monotonic()
started_at = datetime.now(timezone.utc)
status = "success"
try:
result = func(*args, **kwargs)
return result
except Exception as e:
status = f"failed: {type(e).__name__}"
raise
finally:
duration = time.monotonic() - start
try:
conn = psycopg2.connect(conn_string)
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO pipeline_metrics
(dag_id, task_id, started_at, duration_seconds, status)
VALUES (%s, %s, %s, %s, %s)""",
(dag_id, task_id, started_at, duration, status)
)
conn.commit()
conn.close()
except Exception:
pass # Don't let metrics logging break the pipeline
return wrapper
return decorator
4. Metadata & Lineage Monitoring (What Changed?)
When something breaks, you need to answer: what changed? Schema changes, volume spikes, new source systems — these are upstream events that break your pipelines.
Practical steps:
- Schema change detection: Before each run, compare the current source schema against the last known schema. Alert on new, removed, or type-changed columns.
- Volume anomaly detection: Track daily row counts per source. A 50% drop or 200% spike deserves an alert.
- Lineage tracking: Know which downstream tables and dashboards are affected when a source pipeline fails.
You don't need a full-blown data catalog for this. A simple schema snapshot comparison works:
# schema_monitor.py — Detect upstream schema changes
# Compatible with: Python 3.9+, SQLAlchemy 2.x
from sqlalchemy import create_engine, inspect
import json
from pathlib import Path
def get_current_schema(engine, table_name: str, schema: str = "public") -> dict:
"""Extract column names and types from a database table."""
inspector = inspect(engine)
columns = inspector.get_columns(table_name, schema=schema)
return {
col["name"]: str(col["type"])
for col in columns
}
def detect_schema_changes(
engine,
table_name: str,
snapshot_dir: str = "/tmp/schema_snapshots",
schema: str = "public"
) -> dict:
"""Compare current schema against last snapshot. Returns changes dict.
Returns:
{
"added": ["new_col"],
"removed": ["old_col"],
"type_changed": {"col_name": {"old": "INTEGER", "new": "VARCHAR"}}
}
"""
snapshot_path = Path(snapshot_dir) / f"{schema}_{table_name}.json"
current = get_current_schema(engine, table_name, schema)
changes = {"added": [], "removed": [], "type_changed": {}}
if snapshot_path.exists():
previous = json.loads(snapshot_path.read_text())
current_cols = set(current.keys())
previous_cols = set(previous.keys())
changes["added"] = sorted(current_cols - previous_cols)
changes["removed"] = sorted(previous_cols - current_cols)
for col in current_cols & previous_cols:
if current[col] != previous[col]:
changes["type_changed"][col] = {
"old": previous[col],
"new": current[col]
}
# Save current as new snapshot
snapshot_path.parent.mkdir(parents=True, exist_ok=True)
snapshot_path.write_text(json.dumps(current, indent=2))
return changes
Building Your Monitoring Stack: Practical Recommendations
You don't need to buy a dedicated observability platform on day one. Here's a phased approach:
Phase 1: Foundation (Week 1)
- Configure failure callbacks in your orchestrator (Airflow, Dagster, Prefect)
- Set up SLA monitoring for your top 5 critical pipelines
- Route all alerts to Slack/PagerDuty — not just to the orchestrator UI
- Add row count checks as post-pipeline tasks
Phase 2: Data Quality (Month 1)
- Implement SQL-based quality checks (null rates, uniqueness, freshness)
- Add schema change detection on critical source tables
- Build a simple dashboard tracking pipeline durations over time
- Establish on-call rotation for data pipeline alerts
Phase 3: Advanced (Quarter 1)
- Adopt a framework like Great Expectations or dbt tests for systematic quality checks
- Implement anomaly detection on row counts and durations (rolling average ± 2 standard deviations)
- Add lineage tracking to understand blast radius of failures
- Build runbooks for common failure scenarios
Common Mistakes to Avoid
Alert fatigue is the #1 killer. If your team gets 50 alerts a day, they'll ignore all of them — including the critical ones. Start with fewer, high-signal alerts and expand gradually.
Don't monitor only the happy path. Testing that your pipeline produces rows is necessary but insufficient. Check for duplicates, nulls in critical fields, and values that are technically valid but logically wrong (negative revenue, timestamps in the future).
Don't skip the "it succeeded but produced bad data" case. This is the most dangerous failure mode. A pipeline that errors out is annoying; a pipeline that writes corrupt data to production is catastrophic.
Don't build monitoring as an afterthought. The best time to add monitoring is when you build the pipeline. The second-best time is now.
Don't alert on things nobody can act on. Every alert should have a clear owner and a documented response. "Pipeline X failed" is useless without "here's the runbook."
Data Pipeline Monitoring Checklist
Use this as a template when setting up monitoring for a new pipeline:
| Check | Implemented? | Alert Channel | Runbook Link |
|---|---|---|---|
| Task failure alerts | |||
| SLA miss alerts | |||
| Row count validation | |||
| Null rate checks | |||
| Primary key uniqueness | |||
| Data freshness check | |||
| Schema change detection | |||
| Duration tracking | |||
| Volume anomaly detection |
If you're working with diverse data sources — APIs, CSVs, web data — and want a lightweight way to set up and monitor data pipelines without heavy infrastructure, Harbinger Explorer lets you crawl APIs and query data directly in the browser with DuckDB WASM. Its guided setup wizard handles source configuration, and the AI chat interface can help you explore data quality issues interactively during your investigation workflow.
Next Steps
Start with execution monitoring (Phase 1) — it takes an afternoon and catches the most common failures. Then layer in data quality checks over the following weeks. The goal isn't perfection on day one; it's building a monitoring culture where "how will we know if this breaks?" is a standard question during pipeline development.
The teams that monitor well aren't the ones with the fanciest tools. They're the ones who decided that stale data at 7 AM is unacceptable — and built systems to prevent it.
Continue Reading
Continue Reading
Data Lakehouse Architecture Explained
How data lakehouse architecture works, when to use it over a warehouse or lake, and the common pitfalls that trip up data engineering teams.
dbt vs Spark SQL: How to Choose
dbt or Spark SQL for your transformation layer? A side-by-side comparison of features, pricing, and use cases — with code examples for both and honest trade-offs for analytics engineers.
Delta Live Tables vs Classic ETL: Which Fits Your Pipeline?
DLT vs classic ETL compared honestly: declarative expectations, streaming, debugging, testing, and pricing. Includes DLT code example with expectations syntax.
Try Harbinger Explorer for free
Connect any API, upload files, and explore with AI — all in your browser. No credit card required.
Start Free Trial