Harbinger Explorer

Back to Knowledge Hub
databricks
Published:

Building Streaming Tables with Delta Live Tables in Databricks

10 min read·Tags: databricks, delta-live-tables, streaming, pyspark, real-time

Building Streaming Tables with Delta Live Tables in Databricks

Real-time data pipelines have become the backbone of modern data platforms. Whether you're tracking user behavior, ingesting IoT sensor data, or monitoring geopolitical events (like we do at Harbinger Explorer), the ability to process data as it arrives — not hours later — is a competitive differentiator.

Delta Live Tables (DLT) is Databricks' declarative framework for building reliable, maintainable, and testable data pipelines. In this guide, we'll walk through building production-grade streaming tables from scratch, covering ingestion, transformation, monitoring, and operational best practices.


What Are Delta Live Tables?

Delta Live Tables is a framework that lets you define your data pipeline using Python or SQL. Instead of managing Spark Structured Streaming jobs manually, you declare what the data should look like, and DLT handles the how: orchestration, checkpointing, schema evolution, and error handling.

DLT pipelines consist of three main object types:

  • Streaming Tables — append-only tables that ingest from streaming sources (Kafka, Auto Loader, etc.)
  • Materialized Views — computed tables refreshed on a schedule or trigger
  • Live Tables — legacy term, now unified under Materialized Views in newer DLT versions

Setting Up Your First DLT Pipeline

Prerequisites

  • A Databricks workspace (Unity Catalog recommended)
  • A streaming source — we'll use Auto Loader (Databricks' cloud file ingestion system) reading from Azure Data Lake Storage

Step 1: Configure Auto Loader as Source

Auto Loader uses Structured Streaming under the hood. It incrementally ingests new files as they land in a cloud storage location without requiring you to track which files have been processed.

import dlt
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Define the schema for incoming JSON events
event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("source", StringType(), True),
    StructField("location", StringType(), True),
    StructField("severity", DoubleType(), True),
    StructField("event_time", StringType(), True),
    StructField("raw_payload", StringType(), True),
])

@dlt.table(
    name="raw_events_bronze",
    comment="Bronze layer: raw events ingested via Auto Loader",
    table_properties={"quality": "bronze"}
)
def raw_events_bronze():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", "/mnt/checkpoints/raw_events_schema")
            .option("cloudFiles.inferColumnTypes", "true")
            .load("/mnt/landing/events/")
            .withColumn("_ingested_at", current_timestamp())
            .withColumn("_source_file", col("_metadata.file_path"))
    )

Step 2: Silver Layer — Cleaned and Validated Data

The silver layer applies data quality expectations. DLT's @dlt.expect decorators let you declare rules that either warn, drop, or fail on bad records.

@dlt.table(
    name="events_silver",
    comment="Silver layer: validated and cleaned events",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("valid_severity", "severity BETWEEN 0.0 AND 10.0")
@dlt.expect("known_event_type", "event_type IN ('conflict', 'natural_disaster', 'economic', 'political')")
def events_silver():
    return (
        dlt.read_stream("raw_events_bronze")
            .select(
                col("event_id"),
                col("event_type"),
                col("source"),
                col("location"),
                col("severity"),
                col("event_time").cast("timestamp").alias("event_time"),
                col("raw_payload"),
                col("_ingested_at"),
            )
    )

Step 3: Gold Layer — Aggregated Business Metrics

Gold layer tables are often Materialized Views (batch-computed), but you can also create streaming gold tables for real-time aggregations.

@dlt.table(
    name="events_by_region_gold",
    comment="Gold layer: event counts and average severity by region (1-hour windows)",
    table_properties={"quality": "gold"}
)
def events_by_region_gold():
    from pyspark.sql.functions import window, avg, count

    return (
        dlt.read_stream("events_silver")
            .withWatermark("event_time", "10 minutes")
            .groupBy(
                window(col("event_time"), "1 hour"),
                col("location"),
                col("event_type")
            )
            .agg(
                count("event_id").alias("event_count"),
                avg("severity").alias("avg_severity")
            )
            .select(
                col("window.start").alias("window_start"),
                col("window.end").alias("window_end"),
                col("location"),
                col("event_type"),
                col("event_count"),
                col("avg_severity")
            )
    )

Deploying the Pipeline via CLI

You can deploy DLT pipelines using the Databricks CLI or REST API. Here's a pipeline.json configuration:

{
  "name": "harbinger-events-pipeline",
  "target": "harbinger_prod",
  "catalog": "main",
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 2,
        "max_workers": 8,
        "mode": "ENHANCED"
      }
    }
  ],
  "libraries": [
    {"notebook": {"path": "/Repos/harbinger/pipelines/events_pipeline"}}
  ],
  "continuous": true,
  "channel": "CURRENT",
  "edition": "ADVANCED"
}

Deploy with the CLI:

databricks pipelines create --json @pipeline.json

Start the pipeline:

databricks pipelines start --pipeline-id <YOUR_PIPELINE_ID>

Monitoring Streaming Tables

DLT exposes metrics through the event log — a Delta table that records pipeline runs, data quality results, and performance metrics.

-- Query the DLT event log for data quality failures
SELECT
  timestamp,
  details:flow_progress.data_quality.dropped_records AS dropped_records,
  details:flow_progress.metrics.num_output_rows AS output_rows
FROM event_log('<pipeline_id>')
WHERE event_type = 'flow_progress'
  AND details:flow_progress.status = 'COMPLETED'
ORDER BY timestamp DESC
LIMIT 50;

For real-time monitoring, you can set up alerts using Databricks SQL alerts or integrate with your existing observability stack (Datadog, Grafana, etc.).


Advanced Patterns

Schema Evolution

DLT handles schema evolution automatically when you enable it:

@dlt.table(
    schema_hints="event_id STRING, event_type STRING",
    table_properties={"delta.enableChangeDataFeed": "true"}
)

Change Data Capture (CDC) with APPLY CHANGES INTO

For CDC scenarios (e.g., syncing from a transactional database via Debezium):

dlt.apply_changes(
    target="events_scd1",
    source="raw_cdc_stream",
    keys=["event_id"],
    sequence_by=col("_commit_timestamp"),
    apply_as_deletes=col("_op") == "DELETE",
    except_column_list=["_op", "_commit_timestamp"]
)

Common Pitfalls

PitfallSymptomFix
Missing watermark on windowed aggregationsAnalysisException on stream-stream joinsAdd .withWatermark() before window ops
Schema mismatch on restartPipeline fails after schema changeEnable cloudFiles.schemaEvolutionMode = "addNewColumns"
Checkpoint corruptionPipeline stuck on restartDelete checkpoint dir and let DLT rebuild
Over-partitioning small tablesSlow query performanceUse OPTIMIZE + ZORDER after ingestion
Continuous mode on batch sourcesWasted cluster costsUse triggered mode for hourly/daily sources

Cost Optimization Tips

  1. Use Enhanced Autoscaling — DLT's enhanced autoscaler is smarter than standard autoscaling for streaming workloads
  2. Triggered vs Continuous — Only use continuous mode when sub-minute latency is truly required
  3. Serverless DLT — Now GA; eliminates cluster management overhead and often cheaper for variable workloads
  4. Right-size your cluster — Monitor num_output_rows vs cluster cost; many streaming workloads run fine on 2-4 workers

Wrapping Up

Delta Live Tables abstracts away the complexity of managing Spark Structured Streaming checkpoints, schema evolution, and data quality — letting your team focus on business logic rather than infrastructure plumbing. Whether you're building a real-time fraud detection pipeline or ingesting geopolitical event streams, DLT gives you a reliable foundation.

At Harbinger Explorer, we use DLT pipelines to ingest thousands of global events per hour from sources like GDELT, ACLED, and proprietary feeds — all feeding into our geopolitical intelligence platform in near real-time.


Try Harbinger Explorer free for 7 days — experience real-time geopolitical intelligence powered by the same streaming architecture we just walked through. Start your free trial at harbingerexplorer.com.


Continue Reading

Try Harbinger Explorer for free

Connect any API, upload files, and explore with AI — all in your browser. No credit card required.

Start Free Trial

Command Palette

Search for a command to run...