Harbinger Explorer

Back to Knowledge Hub
Engineering

Data Pipeline Monitoring: Catch Failures Before Users Do

9 min read·Tags: data pipeline monitoring, data quality, observability, airflow, data engineering, pipeline alerting, SLA monitoring

Data Pipeline Monitoring: A Practical Guide to Catching Failures Before Your Stakeholders Do

Your Slack lights up at 7:42 AM: "The revenue dashboard is showing yesterday's numbers." You check Airflow — a task failed silently at 3 AM, no alert fired, and now the entire C-suite is looking at stale data. Sound familiar? Data pipeline monitoring is the discipline that prevents exactly this scenario — and most teams don't take it seriously until they've been burned.

This guide walks through concrete monitoring strategies, tooling, and code examples so you can build observability into your pipelines from day one.

Why Pipeline Monitoring Matters More Than You Think

Data pipelines are invisible infrastructure. When they work, nobody notices. When they break, everyone notices — usually hours too late.

The core problem: data pipelines fail silently. Unlike a web app that throws a 500 error to a user, a broken pipeline just... doesn't produce fresh data. The downstream dashboard still loads, the ML model still serves predictions — they're just wrong.

Failure TypeExampleTypical Detection Time (No Monitoring)
Hard failureTask throws an exception, DAG stopsHours (next manual check)
Soft failurePipeline completes but produces 0 rowsDays (stakeholder complaint)
Data driftSchema change upstream breaks joinsDays to weeks
Latency creepPipeline takes 4x longer graduallyWeeks (until SLA breach)
Silent corruptionWrong values, duplicates sneak inWeeks to months

Hard failures are the easy ones. It's the soft failures — the pipeline that "succeeds" but produces garbage — that cause real damage.

The Four Pillars of Data Pipeline Monitoring

Effective monitoring covers four dimensions. Miss one and you have a blind spot.

1. Execution Monitoring (Did It Run?)

The baseline: track whether your pipeline ran, when it started, when it finished, and whether it succeeded.

Every orchestrator gives you this for free — Airflow, Dagster, Prefect all track task states. The mistake teams make is relying solely on the orchestrator UI and not routing these signals to an alerting system.

Here's a practical Airflow example using callbacks to push alerts:

# airflow_callbacks.py — Alerting on task failure via Slack webhook
# Compatible with: Apache Airflow 2.x

import requests
from airflow.models import Variable
from datetime import datetime


def slack_failure_alert(context):
    """Send a Slack alert when any task fails.
    
    Attach this to on_failure_callback at the DAG or task level.
    Requires an Airflow Variable 'slack_webhook_url'.
    """
    webhook_url = Variable.get("slack_webhook_url")
    
    task_instance = context.get("task_instance")
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    execution_date = context.get("execution_date").isoformat()
    log_url = task_instance.log_url
    
    payload = {
        "text": (
            f":rotating_light: *Pipeline Failure*\n"
            f"*DAG:* `{dag_id}`\n"
            f"*Task:* `{task_id}`\n"
            f"*Execution Date:* {execution_date}\n"
            f"*Logs:* <{log_url}|View Logs>"
        )
    }
    
    requests.post(webhook_url, json=payload, timeout=10)


def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Alert when a task misses its SLA.
    
    Attach to sla_miss_callback at the DAG level.
    """
    webhook_url = Variable.get("slack_webhook_url")
    
    task_names = ", ".join([t.task_id for t in task_list])
    payload = {
        "text": (
            f":warning: *SLA Miss*\n"
            f"*DAG:* `{dag.dag_id}`\n"
            f"*Tasks:* `{task_names}`\n"
            f"*Expected by:* {slas[0].execution_date.isoformat()}"
        )
    }
    
    requests.post(webhook_url, json=payload, timeout=10)

Key lesson: SLA monitoring is more useful than failure monitoring. A task that retries 5 times and eventually succeeds at 11 AM instead of 6 AM is technically a "success" — but your stakeholders disagree.

2. Data Quality Monitoring (Is the Output Correct?)

This is where most teams have the biggest gap. The pipeline ran successfully — but did it produce good data?

At minimum, check these after every pipeline run:

  • Row counts: Did we get a reasonable number of rows? (Not zero, not 10x the usual)
  • Null rates: Are critical columns unexpectedly null?
  • Uniqueness: Are primary keys actually unique?
  • Freshness: Is the most recent timestamp within expected range?
  • Value ranges: Are numeric values within plausible bounds?

Here's a practical SQL-based validation suite you can run as a post-pipeline step:

-- Data quality checks for a daily pipeline output
-- Dialect: PostgreSQL (works with minor tweaks on Spark SQL / BigQuery)

-- Check 1: Row count within expected range
WITH row_check AS (
    SELECT 
        COUNT(*) AS row_count,
        CASE 
            WHEN COUNT(*) = 0 THEN 'CRITICAL: Zero rows produced'
            WHEN COUNT(*) < 1000 THEN 'WARNING: Unusually low row count'
            WHEN COUNT(*) > 1000000 THEN 'WARNING: Unusually high row count'
            ELSE 'OK'
        END AS status
    FROM analytics.daily_revenue
    WHERE load_date = CURRENT_DATE
),

-- Check 2: Null rate on critical columns
null_check AS (
    SELECT
        ROUND(
            100.0 * COUNT(*) FILTER (WHERE revenue IS NULL) / NULLIF(COUNT(*), 0), 
            2
        ) AS revenue_null_pct,
        ROUND(
            100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / NULLIF(COUNT(*), 0), 
            2
        ) AS customer_id_null_pct
    FROM analytics.daily_revenue
    WHERE load_date = CURRENT_DATE
),

-- Check 3: Freshness — most recent source timestamp
freshness_check AS (
    SELECT
        MAX(event_timestamp) AS latest_event,
        CASE
            WHEN MAX(event_timestamp) < NOW() - INTERVAL '24 hours' 
            THEN 'CRITICAL: Data older than 24h'
            WHEN MAX(event_timestamp) < NOW() - INTERVAL '6 hours' 
            THEN 'WARNING: Data older than 6h'
            ELSE 'OK'
        END AS freshness_status
    FROM analytics.daily_revenue
    WHERE load_date = CURRENT_DATE
),

-- Check 4: Primary key uniqueness
pk_check AS (
    SELECT
        COUNT(*) AS total_rows,
        COUNT(DISTINCT transaction_id) AS unique_keys,
        CASE
            WHEN COUNT(*) != COUNT(DISTINCT transaction_id)
            THEN 'CRITICAL: Duplicate primary keys detected'
            ELSE 'OK'
        END AS uniqueness_status
    FROM analytics.daily_revenue
    WHERE load_date = CURRENT_DATE
)

SELECT 
    r.row_count,
    r.status AS row_status,
    n.revenue_null_pct,
    n.customer_id_null_pct,
    f.freshness_status,
    p.uniqueness_status
FROM row_check r, null_check n, freshness_check f, pk_check p;

Adapt the thresholds to your actual data. Static thresholds work for v1 — you can graduate to anomaly detection (standard deviation from rolling average) later.

3. Performance Monitoring (Is It Fast Enough?)

Pipeline duration matters. A pipeline that takes 2 hours today and 6 hours next month will eventually breach your SLA — but nobody notices until it does.

Track these metrics over time:

MetricWhy It MattersAlert Threshold (Example)
Total DAG durationSLA compliance> 2x median of last 30 runs
Individual task durationIsolate bottleneck> 3x median for that task
Data volume processedCapacity planning> 2x average (investigate source)
Memory / CPU usageInfrastructure cost> 80% sustained
Queue wait timeOrchestrator capacity> 15 min (scale workers)

A simple approach: log task durations to a metrics table and build a dashboard. Here's a lightweight Python decorator for this:

# task_metrics.py — Log task execution time to a metrics table
# Compatible with: Python 3.9+, psycopg2

import time
import functools
import psycopg2
from datetime import datetime, timezone


def track_duration(dag_id: str, task_id: str, conn_string: str):
    """Decorator that logs task execution duration to a PostgreSQL metrics table.
    
    Usage:
        @track_duration("daily_revenue", "transform", CONN_STRING)
        def transform_data():
            ...
    
    Requires table:
        CREATE TABLE pipeline_metrics (
            dag_id TEXT,
            task_id TEXT,
            started_at TIMESTAMPTZ,
            duration_seconds FLOAT,
            status TEXT
        );
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start = time.monotonic()
            started_at = datetime.now(timezone.utc)
            status = "success"
            
            try:
                result = func(*args, **kwargs)
                return result
            except Exception as e:
                status = f"failed: {type(e).__name__}"
                raise
            finally:
                duration = time.monotonic() - start
                try:
                    conn = psycopg2.connect(conn_string)
                    with conn.cursor() as cur:
                        cur.execute(
                            """INSERT INTO pipeline_metrics 
                               (dag_id, task_id, started_at, duration_seconds, status)
                               VALUES (%s, %s, %s, %s, %s)""",
                            (dag_id, task_id, started_at, duration, status)
                        )
                    conn.commit()
                    conn.close()
                except Exception:
                    pass  # Don't let metrics logging break the pipeline
        
        return wrapper
    return decorator

4. Metadata & Lineage Monitoring (What Changed?)

When something breaks, you need to answer: what changed? Schema changes, volume spikes, new source systems — these are upstream events that break your pipelines.

Practical steps:

  • Schema change detection: Before each run, compare the current source schema against the last known schema. Alert on new, removed, or type-changed columns.
  • Volume anomaly detection: Track daily row counts per source. A 50% drop or 200% spike deserves an alert.
  • Lineage tracking: Know which downstream tables and dashboards are affected when a source pipeline fails.

You don't need a full-blown data catalog for this. A simple schema snapshot comparison works:

# schema_monitor.py — Detect upstream schema changes
# Compatible with: Python 3.9+, SQLAlchemy 2.x

from sqlalchemy import create_engine, inspect
import json
from pathlib import Path


def get_current_schema(engine, table_name: str, schema: str = "public") -> dict:
    """Extract column names and types from a database table."""
    inspector = inspect(engine)
    columns = inspector.get_columns(table_name, schema=schema)
    return {
        col["name"]: str(col["type"]) 
        for col in columns
    }


def detect_schema_changes(
    engine, 
    table_name: str, 
    snapshot_dir: str = "/tmp/schema_snapshots",
    schema: str = "public"
) -> dict:
    """Compare current schema against last snapshot. Returns changes dict.
    
    Returns:
        {
            "added": ["new_col"],
            "removed": ["old_col"],
            "type_changed": {"col_name": {"old": "INTEGER", "new": "VARCHAR"}}
        }
    """
    snapshot_path = Path(snapshot_dir) / f"{schema}_{table_name}.json"
    current = get_current_schema(engine, table_name, schema)
    
    changes = {"added": [], "removed": [], "type_changed": {}}
    
    if snapshot_path.exists():
        previous = json.loads(snapshot_path.read_text())
        
        current_cols = set(current.keys())
        previous_cols = set(previous.keys())
        
        changes["added"] = sorted(current_cols - previous_cols)
        changes["removed"] = sorted(previous_cols - current_cols)
        
        for col in current_cols & previous_cols:
            if current[col] != previous[col]:
                changes["type_changed"][col] = {
                    "old": previous[col],
                    "new": current[col]
                }
    
    # Save current as new snapshot
    snapshot_path.parent.mkdir(parents=True, exist_ok=True)
    snapshot_path.write_text(json.dumps(current, indent=2))
    
    return changes

Building Your Monitoring Stack: Practical Recommendations

You don't need to buy a dedicated observability platform on day one. Here's a phased approach:

Phase 1: Foundation (Week 1)

  • Configure failure callbacks in your orchestrator (Airflow, Dagster, Prefect)
  • Set up SLA monitoring for your top 5 critical pipelines
  • Route all alerts to Slack/PagerDuty — not just to the orchestrator UI
  • Add row count checks as post-pipeline tasks

Phase 2: Data Quality (Month 1)

  • Implement SQL-based quality checks (null rates, uniqueness, freshness)
  • Add schema change detection on critical source tables
  • Build a simple dashboard tracking pipeline durations over time
  • Establish on-call rotation for data pipeline alerts

Phase 3: Advanced (Quarter 1)

  • Adopt a framework like Great Expectations or dbt tests for systematic quality checks
  • Implement anomaly detection on row counts and durations (rolling average ± 2 standard deviations)
  • Add lineage tracking to understand blast radius of failures
  • Build runbooks for common failure scenarios

Common Mistakes to Avoid

Alert fatigue is the #1 killer. If your team gets 50 alerts a day, they'll ignore all of them — including the critical ones. Start with fewer, high-signal alerts and expand gradually.

Don't monitor only the happy path. Testing that your pipeline produces rows is necessary but insufficient. Check for duplicates, nulls in critical fields, and values that are technically valid but logically wrong (negative revenue, timestamps in the future).

Don't skip the "it succeeded but produced bad data" case. This is the most dangerous failure mode. A pipeline that errors out is annoying; a pipeline that writes corrupt data to production is catastrophic.

Don't build monitoring as an afterthought. The best time to add monitoring is when you build the pipeline. The second-best time is now.

Don't alert on things nobody can act on. Every alert should have a clear owner and a documented response. "Pipeline X failed" is useless without "here's the runbook."

Data Pipeline Monitoring Checklist

Use this as a template when setting up monitoring for a new pipeline:

CheckImplemented?Alert ChannelRunbook Link
Task failure alerts
SLA miss alerts
Row count validation
Null rate checks
Primary key uniqueness
Data freshness check
Schema change detection
Duration tracking
Volume anomaly detection

If you're working with diverse data sources — APIs, CSVs, web data — and want a lightweight way to set up and monitor data pipelines without heavy infrastructure, Harbinger Explorer lets you crawl APIs and query data directly in the browser with DuckDB WASM. Its guided setup wizard handles source configuration, and the AI chat interface can help you explore data quality issues interactively during your investigation workflow.

Next Steps

Start with execution monitoring (Phase 1) — it takes an afternoon and catches the most common failures. Then layer in data quality checks over the following weeks. The goal isn't perfection on day one; it's building a monitoring culture where "how will we know if this breaks?" is a standard question during pipeline development.

The teams that monitor well aren't the ones with the fanciest tools. They're the ones who decided that stale data at 7 AM is unacceptable — and built systems to prevent it.

Continue Reading


Continue Reading

Try Harbinger Explorer for free

Connect any API, upload files, and explore with AI — all in your browser. No credit card required.

Start Free Trial

Command Palette

Search for a command to run...