Harbinger Explorer

Back to Knowledge Hub
Engineering

Idempotent Data Pipelines: Patterns for Safe Retries

10 min read·Tags: idempotency, data pipelines, exactly-once, retry safety, data engineering, pipeline patterns, deduplication

Your Airflow job failed at 3 AM, halfway through loading 6 hours of transaction data. You click "Clear and Re-run." Now you have duplicates in production. A data analyst notices revenue is inflated by 40% for the night. This is a non-idempotent pipeline, and it's the most common source of silent data corruption in production systems.

Idempotent pipelines can be run any number of times for the same input and always produce the same output. Failures become non-events — you retry, and nothing breaks.


What Idempotency Means in Data Engineering

In mathematics, a function f is idempotent if f(f(x)) = f(x). In data pipelines: running the same pipeline twice for the same time range produces the same result as running it once.

The critical insight: idempotency is about the effect on the target, not the logic of the pipeline. A pipeline can be computationally complex as long as its writes are idempotent.


Why Pipelines Aren't Idempotent by Default

Most naive pipelines fail idempotency in one of three ways:

# WRONG — non-idempotent append
def load_daily_orders(date: str):
    df = extract_orders(date)
    df.to_sql("orders", engine, if_exists="append")  # Retrying = duplicates
# WRONG — non-idempotent aggregation
def aggregate_daily_sales(date: str):
    rows = db.execute(f"SELECT SUM(amount) FROM orders WHERE date = '{date}'").fetchone()
    db.execute(f"INSERT INTO daily_sales (date, revenue) VALUES ('{date}', {rows[0]})")
    # Retrying = duplicate rows in daily_sales
# WRONG — side-effectful writes without deduplication
def send_events_to_kafka(date: str):
    events = db.execute(f"SELECT * FROM events WHERE date = '{date}'")
    for event in events:
        producer.send("events_topic", event)  # Retrying = duplicate messages downstream

Core Idempotency Patterns

Pattern 1: Partition Overwrite

The most reliable pattern for batch pipelines on partitioned storage. Write to a temporary location, then atomically replace the entire target partition.

# PySpark — idempotent partition overwrite (Delta Lake / Spark)
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("idempotent_load").getOrCreate()

# Set partition overwrite mode to dynamic — only replaces written partitions
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

def load_orders_for_date(processing_date: str):
    # Idempotent: re-running for the same date always produces the same result.
    df = spark.read.parquet(f"s3://raw/orders/date={processing_date}/")

    transformed = df.select(
        "order_id", "customer_id", "amount", "processing_date"
    )

    # Dynamic overwrite: replaces only the partition being written
    transformed.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("processing_date") \
        .save("s3://datalake/orders/")

    # Running this twice for the same date -> same result, no duplicates

Why it works: The second run overwrites the same partition the first run wrote. The result is identical to a single run.

Requirement: Your data must be partitionable by the processing unit (date, hour, batch ID). This is almost always possible for batch pipelines.


Pattern 2: UPSERT / MERGE

For pipelines that update existing rows (slowly changing dimensions, CDC), use MERGE instead of INSERT:

# PySpark — idempotent MERGE using Delta Lake
from delta.tables import DeltaTable

def upsert_customers(new_data_df):
    # Idempotent: running twice with the same DataFrame produces the same target state.
    delta_table = DeltaTable.forPath(spark, "s3://datalake/customers/")

    (delta_table.alias("target")
        .merge(
            new_data_df.alias("source"),
            "target.customer_id = source.customer_id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
    # Second run with same data: MATCH fires, target row is overwritten with same values
    # Net effect: no change. Idempotent.
-- Spark SQL / Databricks SQL — MERGE statement
MERGE INTO customers AS target
USING new_customer_data AS source
  ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Safe to run multiple times for the same source data

Pattern 3: Truncate-and-Reload for Small Tables

For dimension tables or small reference tables where full reloads are practical:

# Python / SQLAlchemy — idempotent truncate-then-insert
from sqlalchemy import create_engine, text

def reload_product_catalog(products: list):
    # Idempotent: truncate + reload produces the same state regardless of run count.
    # Use only for small reference tables where full reload is acceptable.
    engine = create_engine("postgresql://user:pass@host/db")
    with engine.begin() as conn:  # begin() = automatic commit/rollback
        conn.execute(text("TRUNCATE TABLE product_catalog"))
        conn.execute(
            text("INSERT INTO product_catalog (id, name, category) VALUES (:id, :name, :category)"),
            products
        )
    # Wrapped in a transaction: either both TRUNCATE and INSERT succeed, or neither does

Important: Wrap truncate + insert in a single transaction. If the INSERT fails after TRUNCATE without a rollback, you'll have an empty table.


Pattern 4: Natural Key Deduplication

When you can't control write semantics (e.g., raw data landing in object storage from multiple producers), deduplicate at read time using a natural key:

# PySpark — deduplication on natural key before writing
from pyspark.sql.functions import row_number, desc
from pyspark.sql.window import Window

def deduplicate_and_load(raw_df, natural_keys: list, event_time_col: str):
    # Keep only the most recent record per natural key.
    # Idempotent: same raw data -> same deduplicated output.
    window = Window.partitionBy(*natural_keys).orderBy(desc(event_time_col))

    deduped = (
        raw_df
        .withColumn("_row_num", row_number().over(window))
        .filter("_row_num = 1")
        .drop("_row_num")
    )

    return deduped

# Usage
raw = spark.read.parquet("s3://raw/events/date=2024-03-15/")
clean = deduplicate_and_load(raw, natural_keys=["event_id"], event_time_col="ingested_at")
clean.write.format("delta").mode("overwrite").save("s3://datalake/events/")

Pattern 5: Idempotency Keys for External APIs

When pipelines write to external systems (REST APIs, messaging queues), use idempotency keys:

# Python — idempotency key for external API write
import hashlib
import requests

def send_notification(user_id: str, message: str, event_date: str):
    # Idempotent: same (user_id, event_date) always sends the same notification,
    # even if called multiple times.
    
    # Deterministic idempotency key from stable inputs
    idempotency_key = hashlib.sha256(
        f"{user_id}:{event_date}:{message}".encode()
    ).hexdigest()

    response = requests.post(
        "https://api.notifications.example.com/send",
        json={"user_id": user_id, "message": message},
        headers={
            "Idempotency-Key": idempotency_key,
            "Authorization": "Bearer TOKEN"
        }
    )

    if response.status_code == 200:
        return "sent"
    elif response.status_code == 409:
        return "already_sent"  # Duplicate detected by the API — safe to ignore
    else:
        response.raise_for_status()

Not all external APIs support idempotency keys — check the API docs. Stripe, Twilio, and most modern payment APIs support this pattern.


Exactly-Once in Streaming Pipelines

For streaming pipelines, idempotency manifests as exactly-once semantics — each event is processed and written exactly once, even in failure scenarios.

GuaranteeDescriptionAchieved By
At-most-onceEvents may be lost, never duplicatedFire-and-forget (no retries)
At-least-onceEvents may be duplicated, never lostRetries + consumer commits after processing
Exactly-onceEvents processed and written exactly onceTransactional writes + idempotent producers
# PySpark Structured Streaming — exactly-once to Delta Lake
query = (
    stream_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/orders_stream")
    # Delta Lake + Spark Structured Streaming = exactly-once by default
    # The checkpoint tracks Kafka offsets; Delta Lake uses transaction log
    # for atomic writes. Together: exactly-once end-to-end.
    .start("s3://datalake/orders_stream/")
)

Spark Structured Streaming + Delta Lake achieves exactly-once end-to-end by combining Kafka offset checkpointing (tracks what was read) with Delta Lake's transactional writes (ensures atomic, idempotent writes).


Pipeline Idempotency Checklist

ScenarioPattern
Batch write to partitioned storageDynamic partition overwrite
Updating/inserting rowsMERGE / UPSERT
Small dimension/reference tablesTruncate + reload in transaction
Raw data with possible duplicatesNatural key deduplication at read time
External API writesIdempotency keys
Streaming pipelineCheckpointing + transactional writes
Orchestration retriesMake all tasks idempotent; enable automatic retries safely

Testing Idempotency

Idempotency is easy to test — run the pipeline twice and compare the results:

# pytest — idempotency test
def test_pipeline_is_idempotent(spark, test_data):
    # Running the pipeline twice produces the same result as running it once.
    
    # First run
    load_orders_for_date("2024-03-15")
    result_after_first_run = spark.read.format("delta").load("s3://test/orders/").count()

    # Second run — same input
    load_orders_for_date("2024-03-15")
    result_after_second_run = spark.read.format("delta").load("s3://test/orders/").count()

    assert result_after_first_run == result_after_second_run, \
        "Pipeline is not idempotent: row count changed on second run"

Add this test to your pipeline CI. A failing idempotency test is much cheaper than finding duplicates in production.


Common Mistakes

1. Assuming partition overwrite is atomic without checking. In plain Parquet on S3, partition overwrites are NOT atomic — a reader can see a half-written partition during the write window. Use Delta Lake or Iceberg for atomic partition replacements.

2. Using database sequences as primary keys in idempotent loads. Auto-increment keys generate new values on every insert. If you truncate and reload, the keys change. Use natural keys (order_id, customer_id) or deterministic hash keys as primary keys for idempotent tables.

3. Idempotent writes, non-idempotent side effects. Your write to the warehouse is idempotent. But your pipeline also sends a Slack notification and triggers a downstream API call on success. Those side effects need their own idempotency handling.

4. Forgetting that "overwrite" isn't always safe. Overwriting a partition is safe if you own that partition entirely. If multiple pipelines write to the same partition, overwriting will delete other pipelines' data.


If you're running ad-hoc validation queries against your pipeline outputs — checking for duplicates, verifying row counts after a retry, or exploring the output schema — Harbinger Explorer lets you query data files directly in the browser via DuckDB WASM. It's useful for quick idempotency spot-checks without spinning up a full Spark session: just point it at your output files and ask "how many distinct order_ids are there?"


The Bottom Line

Idempotent pipelines are not a nice-to-have — they're the difference between a system that recovers gracefully from failures and one that requires manual cleanup after every incident.

The implementation is almost always simpler than teams expect: dynamic partition overwrite covers most batch cases. MERGE handles incremental updates. Natural key deduplication catches the rest. Build these patterns in from the start, and retrying a failed job becomes a non-event.

Next step: Combine idempotency with good monitoring. Read Data Pipeline Monitoring: What to Track to build the observability layer that tells you when to retry.


Continue Reading


[VERIFY]: Spark Structured Streaming exactly-once guarantees — confirm the specific Delta Lake + checkpoint combination. [VERIFY]: Kafka idempotent producer configuration details for exactly-once.


Continue Reading

Try Harbinger Explorer for free

Connect any API, upload files, and explore with AI — all in your browser. No credit card required.

Start Free Trial

Command Palette

Search for a command to run...