Building Streaming Tables with Delta Live Tables in Databricks
Building Streaming Tables with Delta Live Tables in Databricks
Real-time data pipelines have become the backbone of modern data platforms. Whether you're tracking user behavior, ingesting IoT sensor data, or monitoring geopolitical events (like we do at Harbinger Explorer), the ability to process data as it arrives — not hours later — is a competitive differentiator.
Delta Live Tables (DLT) is Databricks' declarative framework for building reliable, maintainable, and testable data pipelines. In this guide, we'll walk through building production-grade streaming tables from scratch, covering ingestion, transformation, monitoring, and operational best practices.
What Are Delta Live Tables?
Delta Live Tables is a framework that lets you define your data pipeline using Python or SQL. Instead of managing Spark Structured Streaming jobs manually, you declare what the data should look like, and DLT handles the how: orchestration, checkpointing, schema evolution, and error handling.
DLT pipelines consist of three main object types:
- Streaming Tables — append-only tables that ingest from streaming sources (Kafka, Auto Loader, etc.)
- Materialized Views — computed tables refreshed on a schedule or trigger
- Live Tables — legacy term, now unified under Materialized Views in newer DLT versions
Setting Up Your First DLT Pipeline
Prerequisites
- A Databricks workspace (Unity Catalog recommended)
- A streaming source — we'll use Auto Loader (Databricks' cloud file ingestion system) reading from Azure Data Lake Storage
Step 1: Configure Auto Loader as Source
Auto Loader uses Structured Streaming under the hood. It incrementally ingests new files as they land in a cloud storage location without requiring you to track which files have been processed.
import dlt
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Define the schema for incoming JSON events
event_schema = StructType([
StructField("event_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("source", StringType(), True),
StructField("location", StringType(), True),
StructField("severity", DoubleType(), True),
StructField("event_time", StringType(), True),
StructField("raw_payload", StringType(), True),
])
@dlt.table(
name="raw_events_bronze",
comment="Bronze layer: raw events ingested via Auto Loader",
table_properties={"quality": "bronze"}
)
def raw_events_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/mnt/checkpoints/raw_events_schema")
.option("cloudFiles.inferColumnTypes", "true")
.load("/mnt/landing/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
Step 2: Silver Layer — Cleaned and Validated Data
The silver layer applies data quality expectations. DLT's @dlt.expect decorators let you declare rules that either warn, drop, or fail on bad records.
@dlt.table(
name="events_silver",
comment="Silver layer: validated and cleaned events",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("valid_severity", "severity BETWEEN 0.0 AND 10.0")
@dlt.expect("known_event_type", "event_type IN ('conflict', 'natural_disaster', 'economic', 'political')")
def events_silver():
return (
dlt.read_stream("raw_events_bronze")
.select(
col("event_id"),
col("event_type"),
col("source"),
col("location"),
col("severity"),
col("event_time").cast("timestamp").alias("event_time"),
col("raw_payload"),
col("_ingested_at"),
)
)
Step 3: Gold Layer — Aggregated Business Metrics
Gold layer tables are often Materialized Views (batch-computed), but you can also create streaming gold tables for real-time aggregations.
@dlt.table(
name="events_by_region_gold",
comment="Gold layer: event counts and average severity by region (1-hour windows)",
table_properties={"quality": "gold"}
)
def events_by_region_gold():
from pyspark.sql.functions import window, avg, count
return (
dlt.read_stream("events_silver")
.withWatermark("event_time", "10 minutes")
.groupBy(
window(col("event_time"), "1 hour"),
col("location"),
col("event_type")
)
.agg(
count("event_id").alias("event_count"),
avg("severity").alias("avg_severity")
)
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("location"),
col("event_type"),
col("event_count"),
col("avg_severity")
)
)
Deploying the Pipeline via CLI
You can deploy DLT pipelines using the Databricks CLI or REST API. Here's a pipeline.json configuration:
{
"name": "harbinger-events-pipeline",
"target": "harbinger_prod",
"catalog": "main",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 2,
"max_workers": 8,
"mode": "ENHANCED"
}
}
],
"libraries": [
{"notebook": {"path": "/Repos/harbinger/pipelines/events_pipeline"}}
],
"continuous": true,
"channel": "CURRENT",
"edition": "ADVANCED"
}
Deploy with the CLI:
databricks pipelines create --json @pipeline.json
Start the pipeline:
databricks pipelines start --pipeline-id <YOUR_PIPELINE_ID>
Monitoring Streaming Tables
DLT exposes metrics through the event log — a Delta table that records pipeline runs, data quality results, and performance metrics.
-- Query the DLT event log for data quality failures
SELECT
timestamp,
details:flow_progress.data_quality.dropped_records AS dropped_records,
details:flow_progress.metrics.num_output_rows AS output_rows
FROM event_log('<pipeline_id>')
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
ORDER BY timestamp DESC
LIMIT 50;
For real-time monitoring, you can set up alerts using Databricks SQL alerts or integrate with your existing observability stack (Datadog, Grafana, etc.).
Advanced Patterns
Schema Evolution
DLT handles schema evolution automatically when you enable it:
@dlt.table(
schema_hints="event_id STRING, event_type STRING",
table_properties={"delta.enableChangeDataFeed": "true"}
)
Change Data Capture (CDC) with APPLY CHANGES INTO
For CDC scenarios (e.g., syncing from a transactional database via Debezium):
dlt.apply_changes(
target="events_scd1",
source="raw_cdc_stream",
keys=["event_id"],
sequence_by=col("_commit_timestamp"),
apply_as_deletes=col("_op") == "DELETE",
except_column_list=["_op", "_commit_timestamp"]
)
Common Pitfalls
| Pitfall | Symptom | Fix |
|---|---|---|
| Missing watermark on windowed aggregations | AnalysisException on stream-stream joins | Add .withWatermark() before window ops |
| Schema mismatch on restart | Pipeline fails after schema change | Enable cloudFiles.schemaEvolutionMode = "addNewColumns" |
| Checkpoint corruption | Pipeline stuck on restart | Delete checkpoint dir and let DLT rebuild |
| Over-partitioning small tables | Slow query performance | Use OPTIMIZE + ZORDER after ingestion |
| Continuous mode on batch sources | Wasted cluster costs | Use triggered mode for hourly/daily sources |
Cost Optimization Tips
- Use Enhanced Autoscaling — DLT's enhanced autoscaler is smarter than standard autoscaling for streaming workloads
- Triggered vs Continuous — Only use continuous mode when sub-minute latency is truly required
- Serverless DLT — Now GA; eliminates cluster management overhead and often cheaper for variable workloads
- Right-size your cluster — Monitor
num_output_rowsvs cluster cost; many streaming workloads run fine on 2-4 workers
Wrapping Up
Delta Live Tables abstracts away the complexity of managing Spark Structured Streaming checkpoints, schema evolution, and data quality — letting your team focus on business logic rather than infrastructure plumbing. Whether you're building a real-time fraud detection pipeline or ingesting geopolitical event streams, DLT gives you a reliable foundation.
At Harbinger Explorer, we use DLT pipelines to ingest thousands of global events per hour from sources like GDELT, ACLED, and proprietary feeds — all feeding into our geopolitical intelligence platform in near real-time.
Try Harbinger Explorer free for 7 days — experience real-time geopolitical intelligence powered by the same streaming architecture we just walked through. Start your free trial at harbingerexplorer.com.
Continue Reading
Databricks Autoloader: The Complete Guide
CI/CD Pipelines for Databricks Projects: A Production-Ready Guide
Build a robust CI/CD pipeline for your Databricks projects using GitHub Actions, Databricks Asset Bundles, and automated testing. Covers branching strategy, testing, and deployment.
Databricks Cluster Policies for Cost Control: A Practical Guide
Learn how to use Databricks cluster policies to enforce cost guardrails, standardize cluster configurations, and prevent cloud bill surprises without blocking your team's productivity.
Try Harbinger Explorer for free
Connect any API, upload files, and explore with AI — all in your browser. No credit card required.
Start Free Trial