Back to Knowledge Hub
Tutorials

Building a REST API Data Pipeline in Python

6 min read·Tags: rest api, data pipeline, python, api ingestion, etl, pagination, data engineering

You're about to pull data from a REST API for the first time, and it's going fine — until it isn't. Rate limits kick in, the next-page token isn't where the docs said it would be, and suddenly a field that was always a string is now sometimes null. Building a robust rest api data pipeline means planning for all of this upfront, not discovering it at 2am during an on-call incident. This guide walks you through the full pipeline: from first HTTP request to structured, schema-validated output.

Architecture Overview

Loading diagram...

Auth Manager handles token lifecycle — refresh tokens before they expire, not just at startup. A two-hour pipeline that 401s halfway through wastes the entire run.
Rate Limit Handler wraps every request with retry logic and exponential backoff. This is not optional for production pipelines.
Pagination Loop fetches pages until no next-page cursor exists — know your API's pagination pattern before you write a line of code.
Schema Validator catches schema drift early — fail loudly and specifically rather than silently corrupting downstream tables.

🔵 Blue = data flow  |  🟡 Amber = failure surfaces  |  🟢 Green = data quality gates  |  🟣 Purple = storage target


Step 1: Set Up Your HTTP Session

Don't use bare requests.get() calls for production pipelines. Use a Session object — it reuses TCP connections, lets you set default headers once, and centralizes auth management.

import requests
import time
import logging
from typing import Optional, Generator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def build_session(api_key: str) -> requests.Session:
    """Create a reusable session with auth headers and sensible defaults."""
    session = requests.Session()
    session.headers.update({
        "Authorization": f"Bearer {api_key}",
        "Accept": "application/json",
        "Content-Type": "application/json",
    })
    return session

Always set a timeout on every request call — never leave it at the default (which is infinite). A hung API call will block your entire pipeline indefinitely.


Step 2: Add Retry Logic with Exponential Backoff

Rate limits and transient errors are guaranteed to happen at scale. Build your retry logic before you need it.

import random
from requests.exceptions import ConnectionError, Timeout


def fetch_with_retry(
    session: requests.Session,
    url: str,
    params: Optional[dict] = None,
    max_retries: int = 5,
    base_delay: float = 1.0,
) -> dict:
    """
    Fetch a URL with exponential backoff + jitter.
    Respects Retry-After headers on 429 responses.
    Raises RuntimeError after max_retries failed attempts.
    """
    for attempt in range(max_retries):
        try:
            response = session.get(url, params=params, timeout=30)

            # Explicit 429 handling: respect Retry-After if present
            if response.status_code == 429:
                retry_after = int(
                    response.headers.get("Retry-After", base_delay * (2 ** attempt))
                )
                logger.warning(
                    f"Rate limited. Waiting {retry_after}s "
                    f"(attempt {attempt + 1}/{max_retries})"
                )
                time.sleep(retry_after)
                continue

            response.raise_for_status()
            return response.json()

        except (ConnectionError, Timeout) as e:
            if attempt == max_retries - 1:
                raise
            # Jitter prevents thundering herd when multiple workers retry simultaneously
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            logger.warning(f"Request failed ({e}). Retrying in {delay:.1f}s...")
            time.sleep(delay)

    raise RuntimeError(f"Failed to fetch {url} after {max_retries} attempts")

The jitter (random.uniform(0, 1)) is especially important when multiple workers are retrying in parallel — without it, they all retry at the same moment and compound the load.


Step 3: Handle Pagination

APIs use different pagination patterns. The three most common: offset/limit, cursor-based, and page number. Here's a generator for cursor-based pagination — the most common in modern REST APIs.

def paginate_cursor(
    session: requests.Session,
    base_url: str,
    params: Optional[dict] = None,
    cursor_key: str = "next_cursor",
    data_key: str = "data",
    page_size: int = 100,
) -> Generator[list, None, None]:
    """
    Paginate through a cursor-based API endpoint.
    Yields one page of records at a time.

    Args:
        cursor_key: JSON key for the next-page cursor in the response body
        data_key:   JSON key containing the list of records per page
        page_size:  Records per request (adjust to API max)
    """
    params = {**(params or {}), "limit": page_size}

    while True:
        response = fetch_with_retry(session, base_url, params=params)

        records = response.get(data_key, [])
        if not records:
            logger.info("No records returned — pagination complete.")
            break

        yield records
        logger.info(f"Fetched page with {len(records)} records")

        next_cursor = response.get(cursor_key)
        if not next_cursor:
            break  # No more pages

        params["cursor"] = next_cursor
        # URL stays the same; only the cursor param changes

If your API uses offset/limit instead: track offset += len(records) and stop when len(records) < page_size. If it uses page numbers: increment page and stop when the response indicates the last page.


Step 4: Validate and Flatten Responses

APIs change without notice. Fields get added, removed, or silently change type. Catching schema drift at ingestion is far cheaper than debugging it in a downstream transformation.

from dataclasses import dataclass, field
from typing import List


@dataclass
class ApiRecord:
    """
    Validated and flattened record structure.
    Adjust fields to match your API's actual schema.
    """
    id: str
    name: str
    created_at: str
    value: Optional[float] = None
    tags: List[str] = field(default_factory=list)


def validate_record(raw: dict) -> Optional[ApiRecord]:
    """
    Validate a raw API record against expected schema.
    Returns None (with a warning) for invalid records
    instead of crashing the entire pipeline run.
    """
    required_fields = {"id", "name", "created_at"}
    missing = required_fields - raw.keys()

    if missing:
        logger.warning(
            f"Record {raw.get('id', '[no id]')} missing fields: {missing}"
        )
        return None

    return ApiRecord(
        id=str(raw["id"]),
        name=str(raw["name"]),
        created_at=str(raw["created_at"]),
        value=float(raw["value"]) if raw.get("value") is not None else None,
        tags=raw.get("tags", []),
    )

Track the skip rate: if more than a small percentage of records fail validation, something changed in the API schema and you want to know before the bad data propagates.


Step 5: Assemble the Full Pipeline

import json


def run_pipeline(api_key: str, base_url: str, output_path: str) -> None:
    """Full ingestion pipeline: authenticate → paginate → validate → write."""
    session = build_session(api_key)
    valid_records = []
    skipped = 0

    for page in paginate_cursor(session, base_url):
        for raw in page:
            record = validate_record(raw)
            if record:
                valid_records.append(record.__dict__)
            else:
                skipped += 1

    skip_rate = skipped / max(len(valid_records) + skipped, 1)
    logger.info(
        f"Pipeline complete: {len(valid_records)} valid, "
        f"{skipped} skipped ({skip_rate:.1%})"
    )

    if skip_rate > 0.05:  # Alert if >5% of records fail validation
        logger.error("Skip rate exceeded 5% — check for schema changes upstream")

    with open(output_path, "w") as f:
        json.dump(valid_records, f, indent=2, default=str)


if __name__ == "__main__":
    run_pipeline(
        api_key="your-api-key",
        base_url="https://api.example.com/v1/records",
        output_path="output/records.json",
    )

Common Pitfalls

Rate Limits

The naive approach waits for a 429 before slowing down. Better: read X-RateLimit-Remaining headers proactively and throttle before you hit zero. Some APIs have tiered limits — per-second AND per-day — and will return different error messages for each. Read the docs carefully on this one; many rate limit bugs don't surface during development because test volumes are low.

Authentication

OAuth tokens expire. If your pipeline runs for 2+ hours and you get a 401 halfway through, the entire run is wasted. Implement a token refresh check before each batch of requests — not just at startup. Store credentials in environment variables or a secrets manager, never in code or committed config files.

Schema Drift

API providers add fields without warning, change field types silently, or nest data differently between API versions. Your validator should log and skip unexpected shapes rather than crash — but also alert loudly. If 10% of your records are suddenly invalid, that's a breaking change upstream, not a data quality quirk.

Pagination Edge Cases

Two common gotchas: some APIs return an empty page with a valid next cursor on the last real page — check both records == [] AND next_cursor is None. And don't trust total_count for loop termination; some APIs return an estimate. Always paginate until naturally exhausted rather than counting to a target.


When to Move Beyond Manual Ingestion

For one-off scripts or a handful of sources, this code is enough. But when you're ingesting 20+ APIs with credential rotation, schema versioning, and failure monitoring, manual scripts become a significant maintenance burden. Tools that automate the crawl-and-catalog step start to pay off at that scale.

Harbinger Explorer includes an API crawling feature with a guided setup wizard: point it at an endpoint, configure auth, and it handles pagination, scheduling, and schema tracking automatically. Useful when the goal is analysis, not pipeline maintenance.


Next Steps

Solid API ingestion is the first stage. Once data lands reliably, the next problems are transformation and making it queryable by the rest of the team.

Continue Reading


[VERIFY] — Retry-After is an HTTP standard header (RFC 7231); behavior varies per API implementation [VERIFY] — Thundering herd description is accurate in multi-worker contexts; jitter benefit in single-worker is smaller but still relevant for server-side batching


Continue Reading

Try Harbinger Explorer for free

Connect any API, upload files, and explore with AI — all in your browser. No credit card required.

Start Free Trial