Building a REST API Data Pipeline in Python
You're about to pull data from a REST API for the first time, and it's going fine — until it isn't. Rate limits kick in, the next-page token isn't where the docs said it would be, and suddenly a field that was always a string is now sometimes null. Building a robust rest api data pipeline means planning for all of this upfront, not discovering it at 2am during an on-call incident. This guide walks you through the full pipeline: from first HTTP request to structured, schema-validated output.
Architecture Overview
Loading diagram...
① Auth Manager handles token lifecycle — refresh tokens before they expire, not just at startup. A two-hour pipeline that 401s halfway through wastes the entire run.
② Rate Limit Handler wraps every request with retry logic and exponential backoff. This is not optional for production pipelines.
③ Pagination Loop fetches pages until no next-page cursor exists — know your API's pagination pattern before you write a line of code.
④ Schema Validator catches schema drift early — fail loudly and specifically rather than silently corrupting downstream tables.
🔵 Blue = data flow | 🟡 Amber = failure surfaces | 🟢 Green = data quality gates | 🟣 Purple = storage target
Step 1: Set Up Your HTTP Session
Don't use bare requests.get() calls for production pipelines. Use a Session object — it reuses TCP connections, lets you set default headers once, and centralizes auth management.
import requests
import time
import logging
from typing import Optional, Generator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def build_session(api_key: str) -> requests.Session:
"""Create a reusable session with auth headers and sensible defaults."""
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
"Content-Type": "application/json",
})
return session
Always set a timeout on every request call — never leave it at the default (which is infinite). A hung API call will block your entire pipeline indefinitely.
Step 2: Add Retry Logic with Exponential Backoff
Rate limits and transient errors are guaranteed to happen at scale. Build your retry logic before you need it.
import random
from requests.exceptions import ConnectionError, Timeout
def fetch_with_retry(
session: requests.Session,
url: str,
params: Optional[dict] = None,
max_retries: int = 5,
base_delay: float = 1.0,
) -> dict:
"""
Fetch a URL with exponential backoff + jitter.
Respects Retry-After headers on 429 responses.
Raises RuntimeError after max_retries failed attempts.
"""
for attempt in range(max_retries):
try:
response = session.get(url, params=params, timeout=30)
# Explicit 429 handling: respect Retry-After if present
if response.status_code == 429:
retry_after = int(
response.headers.get("Retry-After", base_delay * (2 ** attempt))
)
logger.warning(
f"Rate limited. Waiting {retry_after}s "
f"(attempt {attempt + 1}/{max_retries})"
)
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except (ConnectionError, Timeout) as e:
if attempt == max_retries - 1:
raise
# Jitter prevents thundering herd when multiple workers retry simultaneously
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Request failed ({e}). Retrying in {delay:.1f}s...")
time.sleep(delay)
raise RuntimeError(f"Failed to fetch {url} after {max_retries} attempts")
The jitter (random.uniform(0, 1)) is especially important when multiple workers are retrying in parallel — without it, they all retry at the same moment and compound the load.
Step 3: Handle Pagination
APIs use different pagination patterns. The three most common: offset/limit, cursor-based, and page number. Here's a generator for cursor-based pagination — the most common in modern REST APIs.
def paginate_cursor(
session: requests.Session,
base_url: str,
params: Optional[dict] = None,
cursor_key: str = "next_cursor",
data_key: str = "data",
page_size: int = 100,
) -> Generator[list, None, None]:
"""
Paginate through a cursor-based API endpoint.
Yields one page of records at a time.
Args:
cursor_key: JSON key for the next-page cursor in the response body
data_key: JSON key containing the list of records per page
page_size: Records per request (adjust to API max)
"""
params = {**(params or {}), "limit": page_size}
while True:
response = fetch_with_retry(session, base_url, params=params)
records = response.get(data_key, [])
if not records:
logger.info("No records returned — pagination complete.")
break
yield records
logger.info(f"Fetched page with {len(records)} records")
next_cursor = response.get(cursor_key)
if not next_cursor:
break # No more pages
params["cursor"] = next_cursor
# URL stays the same; only the cursor param changes
If your API uses offset/limit instead: track offset += len(records) and stop when len(records) < page_size. If it uses page numbers: increment page and stop when the response indicates the last page.
Step 4: Validate and Flatten Responses
APIs change without notice. Fields get added, removed, or silently change type. Catching schema drift at ingestion is far cheaper than debugging it in a downstream transformation.
from dataclasses import dataclass, field
from typing import List
@dataclass
class ApiRecord:
"""
Validated and flattened record structure.
Adjust fields to match your API's actual schema.
"""
id: str
name: str
created_at: str
value: Optional[float] = None
tags: List[str] = field(default_factory=list)
def validate_record(raw: dict) -> Optional[ApiRecord]:
"""
Validate a raw API record against expected schema.
Returns None (with a warning) for invalid records
instead of crashing the entire pipeline run.
"""
required_fields = {"id", "name", "created_at"}
missing = required_fields - raw.keys()
if missing:
logger.warning(
f"Record {raw.get('id', '[no id]')} missing fields: {missing}"
)
return None
return ApiRecord(
id=str(raw["id"]),
name=str(raw["name"]),
created_at=str(raw["created_at"]),
value=float(raw["value"]) if raw.get("value") is not None else None,
tags=raw.get("tags", []),
)
Track the skip rate: if more than a small percentage of records fail validation, something changed in the API schema and you want to know before the bad data propagates.
Step 5: Assemble the Full Pipeline
import json
def run_pipeline(api_key: str, base_url: str, output_path: str) -> None:
"""Full ingestion pipeline: authenticate → paginate → validate → write."""
session = build_session(api_key)
valid_records = []
skipped = 0
for page in paginate_cursor(session, base_url):
for raw in page:
record = validate_record(raw)
if record:
valid_records.append(record.__dict__)
else:
skipped += 1
skip_rate = skipped / max(len(valid_records) + skipped, 1)
logger.info(
f"Pipeline complete: {len(valid_records)} valid, "
f"{skipped} skipped ({skip_rate:.1%})"
)
if skip_rate > 0.05: # Alert if >5% of records fail validation
logger.error("Skip rate exceeded 5% — check for schema changes upstream")
with open(output_path, "w") as f:
json.dump(valid_records, f, indent=2, default=str)
if __name__ == "__main__":
run_pipeline(
api_key="your-api-key",
base_url="https://api.example.com/v1/records",
output_path="output/records.json",
)
Common Pitfalls
Rate Limits
The naive approach waits for a 429 before slowing down. Better: read X-RateLimit-Remaining headers proactively and throttle before you hit zero. Some APIs have tiered limits — per-second AND per-day — and will return different error messages for each. Read the docs carefully on this one; many rate limit bugs don't surface during development because test volumes are low.
Authentication
OAuth tokens expire. If your pipeline runs for 2+ hours and you get a 401 halfway through, the entire run is wasted. Implement a token refresh check before each batch of requests — not just at startup. Store credentials in environment variables or a secrets manager, never in code or committed config files.
Schema Drift
API providers add fields without warning, change field types silently, or nest data differently between API versions. Your validator should log and skip unexpected shapes rather than crash — but also alert loudly. If 10% of your records are suddenly invalid, that's a breaking change upstream, not a data quality quirk.
Pagination Edge Cases
Two common gotchas: some APIs return an empty page with a valid next cursor on the last real page — check both records == [] AND next_cursor is None. And don't trust total_count for loop termination; some APIs return an estimate. Always paginate until naturally exhausted rather than counting to a target.
When to Move Beyond Manual Ingestion
For one-off scripts or a handful of sources, this code is enough. But when you're ingesting 20+ APIs with credential rotation, schema versioning, and failure monitoring, manual scripts become a significant maintenance burden. Tools that automate the crawl-and-catalog step start to pay off at that scale.
Harbinger Explorer includes an API crawling feature with a guided setup wizard: point it at an endpoint, configure auth, and it handles pagination, scheduling, and schema tracking automatically. Useful when the goal is analysis, not pipeline maintenance.
Next Steps
Solid API ingestion is the first stage. Once data lands reliably, the next problems are transformation and making it queryable by the rest of the team.
Continue Reading
- dbt vs Spark SQL: How to Choose — pick the right transformation layer once your API data lands
- Self-Service Analytics: Why Most Teams Get It Wrong — make pipeline data usable by non-engineers
- AI Agents vs BI Dashboards: What's Actually Changing — rethink how your team queries the data you're ingesting
[VERIFY] — Retry-After is an HTTP standard header (RFC 7231); behavior varies per API implementation [VERIFY] — Thundering herd description is accurate in multi-worker contexts; jitter benefit in single-worker is smaller but still relevant for server-side batching
Continue Reading
Natural Language SQL: Ask Your Data Questions in Plain English
How NL2SQL works, real examples of natural language questions converted to SQL, an honest comparison of tools, and where it fails.
DuckDB Tutorial: Analytical SQL Directly in Your Browser
Get started with DuckDB in 15 minutes. Learn read_parquet, read_csv_auto, PIVOT, and when DuckDB beats SQLite and PostgreSQL for analytical SQL.
Excel to SQL: A Migration Guide for Business Analysts
Complete guide to Excel to SQL migration for business analysts. 25-row concept mapping table, SQL code examples, common pitfalls, and tips for making the switch stick.
Try Harbinger Explorer for free
Connect any API, upload files, and explore with AI — all in your browser. No credit card required.
Start Free Trial