PySpark Reading from APIs and Databases: JDBC Connections, REST API Pagination, Parallel Reads, Connection Pooling, and Production Ingestion Patterns

PySpark Reading from APIs and Databases: JDBC Connections, REST API Pagination, Parallel Reads, Connection Pooling, and Production Ingestion Patterns

Most PySpark tutorials read from Parquet or CSV files that magically exist in a folder. In real projects, data lives in databases (SQL Server, PostgreSQL, Oracle, MySQL) and REST APIs (Salesforce, Stripe, internal microservices). Getting that data INTO your Lakehouse is the first step of every pipeline — and it is surprisingly tricky to do well at scale.

This post covers reading from databases via JDBC (including parallel reads for large tables) and from REST APIs (including pagination, rate limiting, and converting JSON responses to DataFrames) — the patterns you need for production data ingestion.

Think of database reads like filling a swimming pool. A single JDBC connection is one garden hose — it works, but filling 10 million gallons takes forever. Parallel reads are 10 fire hoses working simultaneously — each fills a section of the pool. The pool fills 10x faster, but you need to coordinate so the hoses do not overlap or flood one section.

Table of Contents

  • Reading from Databases (JDBC)
  • Basic JDBC Read
  • JDBC Connection Properties
  • Reading with a Custom SQL Query
  • Parallel JDBC Reads (The Performance Key)
  • How Parallel Reads Work
  • Choosing the Partition Column
  • Writing Database Data to Delta
  • Reading from REST APIs
  • Basic API Read to DataFrame
  • Handling Pagination
  • Rate Limiting and Retry
  • Parallel API Calls with foreachPartition
  • API to Delta Pipeline
  • Connection Security
  • Using Key Vault for Credentials
  • JDBC with Managed Identity
  • Production Ingestion Patterns
  • Pattern 1: Full Load from Database
  • Pattern 2: Incremental Load with Watermark
  • Pattern 3: API to Bronze to Silver
  • Common Mistakes
  • Interview Questions
  • Wrapping Up

Reading from Databases (JDBC)

Basic JDBC Read

# Read an entire table via JDBC
jdbc_url = "jdbc:sqlserver://myserver.database.windows.net:1433;database=mydb"
connection_properties = {
    "user": "admin",
    "password": "password123",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

df = (spark.read
    .format("jdbc")
    .option("url", jdbc_url)
    .option("dbtable", "SalesLT.Customer")
    .option("user", "admin")
    .option("password", "password123")
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()
)

print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
df.printSchema()
df.show(5)

JDBC Connection Properties

Property Description Example
url JDBC connection URL jdbc:sqlserver://server:1433;database=db
dbtable Table or subquery SalesLT.Customer or (SELECT * FROM t WHERE year=2026) t
user Database username admin
password Database password Use Key Vault, not hardcoded!
driver JDBC driver class com.microsoft.sqlserver.jdbc.SQLServerDriver
fetchsize Rows fetched per network round trip 10000 (default varies by driver)
queryTimeout Timeout in seconds 300

Common JDBC URLs:

# SQL Server / Azure SQL
"jdbc:sqlserver://server.database.windows.net:1433;database=mydb;encrypt=true"

# PostgreSQL
"jdbc:postgresql://host:5432/mydb"

# MySQL
"jdbc:mysql://host:3306/mydb"

# Oracle
"jdbc:oracle:thin:@//host:1521/service_name"

Reading with a Custom SQL Query

# Instead of reading the full table, push a query to the database
# The database executes the query and sends only the result to Spark

query = """
(SELECT CustomerID, FirstName, LastName, City
 FROM SalesLT.Customer
 WHERE ModifiedDate >= '2026-01-01'
 AND City IS NOT NULL) AS filtered_customers
"""

df = (spark.read.format("jdbc")
    .option("url", jdbc_url)
    .option("dbtable", query)    # Note: subquery must be aliased
    .option("user", user)
    .option("password", password)
    .option("driver", driver)
    .load()
)

# Benefits:
# 1. Only transfers the data you need (less network I/O)
# 2. Database applies WHERE filter (not Spark)
# 3. Only selected columns transferred (column pruning)

Parallel JDBC Reads (The Performance Key)

How Parallel Reads Work

Default JDBC read (single connection):
  [Spark] ←── one connection ──→ [Database]
  All 10M rows flow through ONE thread
  Time: 20 minutes

Parallel JDBC read (multiple connections):
  [Spark Executor 1] ←── connection 1 ──→ [Database: rows 1-2.5M]
  [Spark Executor 2] ←── connection 2 ──→ [Database: rows 2.5M-5M]
  [Spark Executor 3] ←── connection 3 ──→ [Database: rows 5M-7.5M]
  [Spark Executor 4] ←── connection 4 ──→ [Database: rows 7.5M-10M]
  4 threads, each reads 2.5M rows simultaneously
  Time: 5 minutes (4x faster)
# Parallel JDBC read — the MOST IMPORTANT optimization for database reads
df = (spark.read.format("jdbc")
    .option("url", jdbc_url)
    .option("dbtable", "SalesLT.Customer")
    .option("user", user)
    .option("password", password)
    .option("driver", driver)
    # Parallel read settings:
    .option("partitionColumn", "CustomerID")    # Column to split on (must be numeric)
    .option("lowerBound", 1)                     # Min value of partitionColumn
    .option("upperBound", 1000000)               # Max value of partitionColumn
    .option("numPartitions", 10)                 # Number of parallel connections
    .option("fetchsize", 10000)                  # Rows per network fetch
    .load()
)

# This generates 10 parallel queries:
# SELECT * FROM SalesLT.Customer WHERE CustomerID >= 1 AND CustomerID < 100001
# SELECT * FROM SalesLT.Customer WHERE CustomerID >= 100001 AND CustomerID < 200001
# ... (10 queries, each reading ~100K rows)

Choosing the Partition Column

Good Partition Column Bad Partition Column Why
Auto-increment ID String column (name, email) Must be numeric for range splits
Evenly distributed ID Skewed column (status: 95% active) Uneven distribution = one partition gets 95% of data
Date as integer (20260115) Boolean column (0/1) Only creates 2 partitions max

# Find good bounds for your partition column
bounds = spark.read.format("jdbc").options(**jdbc_options).option(
    "dbtable", "(SELECT MIN(CustomerID) as min_id, MAX(CustomerID) as max_id FROM SalesLT.Customer) t"
).load().first()

print(f"Lower: {bounds['min_id']}, Upper: {bounds['max_id']}")
# Use these values for lowerBound and upperBound

Writing Database Data to Delta

# Read from database → write to Delta (Bronze layer)
df = spark.read.format("jdbc").options(**jdbc_options).load()

df.write.format("delta").mode("overwrite").saveAsTable("bronze.crm_customer")
print(f"Loaded {df.count()} rows to bronze.crm_customer")

Reading from REST APIs

Basic API Read to DataFrame

import requests
import json
from pyspark.sql import Row

# Simple API call → DataFrame
response = requests.get(
    "https://api.example.com/customers",
    headers={"Authorization": "Bearer YOUR_TOKEN"},
    timeout=30
)
response.raise_for_status()
data = response.json()

# Convert list of dicts to DataFrame
df = spark.createDataFrame([Row(**record) for record in data["results"]])
df.show(5)

Handling Pagination

def fetch_all_pages(base_url, headers, page_size=100):
    """Fetch all pages from a paginated REST API."""
    all_records = []
    page = 1

    while True:
        response = requests.get(
            base_url,
            params={"page": page, "per_page": page_size},
            headers=headers,
            timeout=30
        )
        response.raise_for_status()
        data = response.json()

        records = data.get("results", [])
        if not records:
            break

        all_records.extend(records)
        print(f"  Page {page}: {len(records)} records (total: {len(all_records)})")

        # Check for next page
        if not data.get("has_more", False):
            break
        page += 1

    return all_records

# Usage
records = fetch_all_pages(
    "https://api.example.com/orders",
    headers={"Authorization": "Bearer TOKEN"}
)
df = spark.createDataFrame([Row(**r) for r in records])
print(f"Total records from API: {df.count()}")

Rate Limiting and Retry

import time

def fetch_with_retry(url, headers, max_retries=3, delay=2):
    """Fetch URL with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers, timeout=30)

            if response.status_code == 429:  # Rate limited
                wait = int(response.headers.get("Retry-After", delay * (attempt + 1)))
                print(f"  Rate limited. Waiting {wait} seconds...")
                time.sleep(wait)
                continue

            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            print(f"  Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(delay * (2 ** attempt))  # Exponential backoff
            else:
                raise
    return None

Parallel API Calls with foreachPartition

# For APIs where you need to fetch data for many entities
# Example: fetch order details for 10,000 customer IDs

customer_ids = spark.table("silver.customers").select("customer_id")

# Repartition to control parallelism
customer_ids_partitioned = customer_ids.repartition(10)

def fetch_orders_for_partition(rows):
    """Called once per partition — batch API calls."""
    import requests
    results = []
    for row in rows:
        resp = requests.get(f"https://api.example.com/orders/{row.customer_id}",
                          headers={"Authorization": "Bearer TOKEN"}, timeout=30)
        if resp.status_code == 200:
            results.append(resp.json())
        time.sleep(0.1)  # Rate limiting within partition
    return results

# Collect results (for small datasets)
# For large datasets, use foreachPartition to write directly to storage

API to Delta Pipeline

# Complete API → Bronze → Silver pipeline
import datetime

def ingest_api_to_bronze(api_url, headers, table_name):
    """Fetch API data and write to Bronze Delta table."""
    records = fetch_all_pages(api_url, headers)

    if not records:
        print(f"No records from API. Skipping {table_name}.")
        return 0

    df = spark.createDataFrame([Row(**r) for r in records])
    df = df.withColumn("_ingested_at", current_timestamp())
    df = df.withColumn("_source", lit("api"))

    df.write.format("delta").mode("append").saveAsTable(f"bronze.{table_name}")

    row_count = df.count()
    print(f"Loaded {row_count} rows to bronze.{table_name}")
    return row_count

# Run
ingest_api_to_bronze(
    "https://api.example.com/customers",
    {"Authorization": "Bearer TOKEN"},
    "api_customers"
)

Connection Security

Using Key Vault for Credentials

# NEVER hardcode passwords in notebooks!
# Use Databricks secret scopes backed by Azure Key Vault

jdbc_password = dbutils.secrets.get(scope="keyvault-scope", key="sql-password")
api_token = dbutils.secrets.get(scope="keyvault-scope", key="api-token")

df = (spark.read.format("jdbc")
    .option("url", jdbc_url)
    .option("user", "admin")
    .option("password", jdbc_password)  # From Key Vault, not hardcoded
    .option("driver", driver)
    .option("dbtable", "SalesLT.Customer")
    .load()
)

JDBC with Managed Identity

# Azure SQL with Managed Identity (no password needed!)
jdbc_url = (
    "jdbc:sqlserver://myserver.database.windows.net:1433;"
    "database=mydb;"
    "encrypt=true;"
    "authentication=ActiveDirectoryMSI"
)

df = (spark.read.format("jdbc")
    .option("url", jdbc_url)
    .option("dbtable", "SalesLT.Customer")
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()
)
# No user/password needed — Databricks authenticates with its Managed Identity

Production Ingestion Patterns

Pattern 1: Full Load from Database

def full_load_table(table_name, schema_name, partition_col=None, num_partitions=10):
    """Full load a table from SQL to Delta Bronze."""
    options = {
        "url": jdbc_url, "user": user, "password": password,
        "driver": driver, "dbtable": f"{schema_name}.{table_name}",
        "fetchsize": "10000"
    }
    if partition_col:
        bounds = spark.read.format("jdbc").options(**options).option(
            "dbtable", f"(SELECT MIN({partition_col}) mn, MAX({partition_col}) mx FROM {schema_name}.{table_name}) t"
        ).load().first()
        options.update({
            "partitionColumn": partition_col,
            "lowerBound": str(bounds["mn"]),
            "upperBound": str(bounds["mx"]),
            "numPartitions": str(num_partitions)
        })

    df = spark.read.format("jdbc").options(**options).load()
    df.write.format("delta").mode("overwrite").saveAsTable(f"bronze.{table_name}")
    return df.count()

Pattern 2: Incremental Load with Watermark

def incremental_load(table_name, schema_name, watermark_col):
    """Load only rows newer than the last watermark."""
    # Get the last loaded watermark from audit table
    try:
        last_wm = spark.sql(f"""
            SELECT MAX(watermark_value) as wm 
            FROM audit.load_watermarks 
            WHERE table_name = '{table_name}'
        """).first()["wm"]
    except:
        last_wm = "1900-01-01"  # First run

    query = f"""
    (SELECT * FROM {schema_name}.{table_name}
     WHERE {watermark_col} > '{last_wm}') incremental_data
    """

    df = spark.read.format("jdbc").options(**jdbc_options).option("dbtable", query).load()

    if df.count() > 0:
        df.write.format("delta").mode("append").saveAsTable(f"bronze.{table_name}")
        new_wm = df.agg({watermark_col: "max"}).first()[0]
        spark.sql(f"INSERT INTO audit.load_watermarks VALUES ('{table_name}', '{new_wm}', current_timestamp())")

    return df.count()

Pattern 3: API to Bronze to Silver

def api_to_bronze_to_silver(api_url, headers, bronze_table, silver_table, clean_fn):
    """End-to-end: API → Bronze (raw) → Silver (cleaned)."""
    # Bronze: raw API data
    records = fetch_all_pages(api_url, headers)
    raw_df = spark.createDataFrame([Row(**r) for r in records])
    raw_df = raw_df.withColumn("_ingested_at", current_timestamp())
    raw_df.write.format("delta").mode("append").saveAsTable(f"bronze.{bronze_table}")

    # Silver: cleaned data
    clean_df = clean_fn(raw_df)  # Apply cleaning function
    clean_df.write.format("delta").mode("overwrite").saveAsTable(f"silver.{silver_table}")

    print(f"Bronze: {raw_df.count()} rows | Silver: {clean_df.count()} rows")

Common Mistakes

  1. Single-threaded JDBC reads on large tables — always use parallel reads (partitionColumn, numPartitions) for tables over 100K rows. A single connection can be 10x slower than 10 parallel connections.
  2. Hardcoding database credentials — use Databricks secret scopes backed by Azure Key Vault. Passwords in notebooks get committed to Git and are visible to anyone with workspace access.
  3. Not pushing filters to the database — reading the full table and filtering in Spark transfers unnecessary data. Use a subquery in dbtable to filter at the source: (SELECT * FROM table WHERE date > '2026-01-01') t.
  4. Not handling API pagination — most APIs return 100 or 1000 records per page. Reading just the first page misses the rest of your data. Always loop until has_more is false or the result is empty.
  5. Choosing a skewed partition column — if 90% of IDs are between 1 and 100 and 10% are between 100 and 10M, your partitions will be heavily skewed. Check the distribution before choosing lowerBound and upperBound.

Interview Questions

Q: How do you optimize JDBC reads for large tables in PySpark? A: Use parallel reads by specifying partitionColumn (a numeric column like ID), lowerBound, upperBound, and numPartitions. Spark generates N parallel queries, each reading a range of the partition column. Also increase fetchsize (e.g., 10000) to reduce network round trips. Push filters to the database using a subquery in dbtable.

Q: How do you handle REST API pagination in a data pipeline? A: Loop through pages: send a request with page number and page size, collect the results, check if more pages exist (via has_more, next_url, or empty results), and continue until all data is fetched. Add retry with exponential backoff for rate limiting (HTTP 429). Convert the collected records to a Spark DataFrame and write to Delta.

Q: Why should you never hardcode database credentials in notebooks? A: Notebooks are often committed to Git (Databricks Repos) and visible to other workspace users. Hardcoded passwords are a security breach waiting to happen. Use Databricks secret scopes backed by Azure Key Vault or AWS Secrets Manager. Access secrets via dbutils.secrets.get(). For Azure SQL, prefer Managed Identity authentication, which requires no password at all.

Wrapping Up

Reading from databases and APIs is the starting point of every data pipeline. Master parallel JDBC reads for databases (10x performance with numPartitions), paginated API fetching for REST services, and always secure credentials with Key Vault or Managed Identity. These ingestion patterns feed your Bronze layer — the foundation of your entire Lakehouse.

Related posts:Connecting Databricks to Azure SQLSecret Scopes & Key VaultConnecting to Blob/ADLSHow Real Companies Receive Data


Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link