PySpark Reading from APIs and Databases: JDBC Connections, REST API Pagination, Parallel Reads, Connection Pooling, and Production Ingestion Patterns
Most PySpark tutorials read from Parquet or CSV files that magically exist in a folder. In real projects, data lives in databases (SQL Server, PostgreSQL, Oracle, MySQL) and REST APIs (Salesforce, Stripe, internal microservices). Getting that data INTO your Lakehouse is the first step of every pipeline — and it is surprisingly tricky to do well at scale.
This post covers reading from databases via JDBC (including parallel reads for large tables) and from REST APIs (including pagination, rate limiting, and converting JSON responses to DataFrames) — the patterns you need for production data ingestion.
Think of database reads like filling a swimming pool. A single JDBC connection is one garden hose — it works, but filling 10 million gallons takes forever. Parallel reads are 10 fire hoses working simultaneously — each fills a section of the pool. The pool fills 10x faster, but you need to coordinate so the hoses do not overlap or flood one section.
Table of Contents
- Reading from Databases (JDBC)
- Basic JDBC Read
- JDBC Connection Properties
- Reading with a Custom SQL Query
- Parallel JDBC Reads (The Performance Key)
- How Parallel Reads Work
- Choosing the Partition Column
- Writing Database Data to Delta
- Reading from REST APIs
- Basic API Read to DataFrame
- Handling Pagination
- Rate Limiting and Retry
- Parallel API Calls with foreachPartition
- API to Delta Pipeline
- Connection Security
- Using Key Vault for Credentials
- JDBC with Managed Identity
- Production Ingestion Patterns
- Pattern 1: Full Load from Database
- Pattern 2: Incremental Load with Watermark
- Pattern 3: API to Bronze to Silver
- Common Mistakes
- Interview Questions
- Wrapping Up
Reading from Databases (JDBC)
Basic JDBC Read
# Read an entire table via JDBC
jdbc_url = "jdbc:sqlserver://myserver.database.windows.net:1433;database=mydb"
connection_properties = {
"user": "admin",
"password": "password123",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
df = (spark.read
.format("jdbc")
.option("url", jdbc_url)
.option("dbtable", "SalesLT.Customer")
.option("user", "admin")
.option("password", "password123")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
)
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
df.printSchema()
df.show(5)
JDBC Connection Properties
| Property | Description | Example |
|---|---|---|
url |
JDBC connection URL | jdbc:sqlserver://server:1433;database=db |
dbtable |
Table or subquery | SalesLT.Customer or (SELECT * FROM t WHERE year=2026) t |
user |
Database username | admin |
password |
Database password | Use Key Vault, not hardcoded! |
driver |
JDBC driver class | com.microsoft.sqlserver.jdbc.SQLServerDriver |
fetchsize |
Rows fetched per network round trip | 10000 (default varies by driver) |
queryTimeout |
Timeout in seconds | 300 |
Common JDBC URLs:
# SQL Server / Azure SQL
"jdbc:sqlserver://server.database.windows.net:1433;database=mydb;encrypt=true"
# PostgreSQL
"jdbc:postgresql://host:5432/mydb"
# MySQL
"jdbc:mysql://host:3306/mydb"
# Oracle
"jdbc:oracle:thin:@//host:1521/service_name"
Reading with a Custom SQL Query
# Instead of reading the full table, push a query to the database
# The database executes the query and sends only the result to Spark
query = """
(SELECT CustomerID, FirstName, LastName, City
FROM SalesLT.Customer
WHERE ModifiedDate >= '2026-01-01'
AND City IS NOT NULL) AS filtered_customers
"""
df = (spark.read.format("jdbc")
.option("url", jdbc_url)
.option("dbtable", query) # Note: subquery must be aliased
.option("user", user)
.option("password", password)
.option("driver", driver)
.load()
)
# Benefits:
# 1. Only transfers the data you need (less network I/O)
# 2. Database applies WHERE filter (not Spark)
# 3. Only selected columns transferred (column pruning)
Parallel JDBC Reads (The Performance Key)
How Parallel Reads Work
Default JDBC read (single connection):
[Spark] ←── one connection ──→ [Database]
All 10M rows flow through ONE thread
Time: 20 minutes
Parallel JDBC read (multiple connections):
[Spark Executor 1] ←── connection 1 ──→ [Database: rows 1-2.5M]
[Spark Executor 2] ←── connection 2 ──→ [Database: rows 2.5M-5M]
[Spark Executor 3] ←── connection 3 ──→ [Database: rows 5M-7.5M]
[Spark Executor 4] ←── connection 4 ──→ [Database: rows 7.5M-10M]
4 threads, each reads 2.5M rows simultaneously
Time: 5 minutes (4x faster)
# Parallel JDBC read — the MOST IMPORTANT optimization for database reads
df = (spark.read.format("jdbc")
.option("url", jdbc_url)
.option("dbtable", "SalesLT.Customer")
.option("user", user)
.option("password", password)
.option("driver", driver)
# Parallel read settings:
.option("partitionColumn", "CustomerID") # Column to split on (must be numeric)
.option("lowerBound", 1) # Min value of partitionColumn
.option("upperBound", 1000000) # Max value of partitionColumn
.option("numPartitions", 10) # Number of parallel connections
.option("fetchsize", 10000) # Rows per network fetch
.load()
)
# This generates 10 parallel queries:
# SELECT * FROM SalesLT.Customer WHERE CustomerID >= 1 AND CustomerID < 100001
# SELECT * FROM SalesLT.Customer WHERE CustomerID >= 100001 AND CustomerID < 200001
# ... (10 queries, each reading ~100K rows)
Choosing the Partition Column
| Good Partition Column | Bad Partition Column | Why |
|---|---|---|
| Auto-increment ID | String column (name, email) | Must be numeric for range splits |
| Evenly distributed ID | Skewed column (status: 95% active) | Uneven distribution = one partition gets 95% of data |
| Date as integer (20260115) | Boolean column (0/1) | Only creates 2 partitions max |
# Find good bounds for your partition column
bounds = spark.read.format("jdbc").options(**jdbc_options).option(
"dbtable", "(SELECT MIN(CustomerID) as min_id, MAX(CustomerID) as max_id FROM SalesLT.Customer) t"
).load().first()
print(f"Lower: {bounds['min_id']}, Upper: {bounds['max_id']}")
# Use these values for lowerBound and upperBound
Writing Database Data to Delta
# Read from database → write to Delta (Bronze layer)
df = spark.read.format("jdbc").options(**jdbc_options).load()
df.write.format("delta").mode("overwrite").saveAsTable("bronze.crm_customer")
print(f"Loaded {df.count()} rows to bronze.crm_customer")
Reading from REST APIs
Basic API Read to DataFrame
import requests
import json
from pyspark.sql import Row
# Simple API call → DataFrame
response = requests.get(
"https://api.example.com/customers",
headers={"Authorization": "Bearer YOUR_TOKEN"},
timeout=30
)
response.raise_for_status()
data = response.json()
# Convert list of dicts to DataFrame
df = spark.createDataFrame([Row(**record) for record in data["results"]])
df.show(5)
Handling Pagination
def fetch_all_pages(base_url, headers, page_size=100):
"""Fetch all pages from a paginated REST API."""
all_records = []
page = 1
while True:
response = requests.get(
base_url,
params={"page": page, "per_page": page_size},
headers=headers,
timeout=30
)
response.raise_for_status()
data = response.json()
records = data.get("results", [])
if not records:
break
all_records.extend(records)
print(f" Page {page}: {len(records)} records (total: {len(all_records)})")
# Check for next page
if not data.get("has_more", False):
break
page += 1
return all_records
# Usage
records = fetch_all_pages(
"https://api.example.com/orders",
headers={"Authorization": "Bearer TOKEN"}
)
df = spark.createDataFrame([Row(**r) for r in records])
print(f"Total records from API: {df.count()}")
Rate Limiting and Retry
import time
def fetch_with_retry(url, headers, max_retries=3, delay=2):
"""Fetch URL with exponential backoff retry."""
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 429: # Rate limited
wait = int(response.headers.get("Retry-After", delay * (attempt + 1)))
print(f" Rate limited. Waiting {wait} seconds...")
time.sleep(wait)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f" Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(delay * (2 ** attempt)) # Exponential backoff
else:
raise
return None
Parallel API Calls with foreachPartition
# For APIs where you need to fetch data for many entities
# Example: fetch order details for 10,000 customer IDs
customer_ids = spark.table("silver.customers").select("customer_id")
# Repartition to control parallelism
customer_ids_partitioned = customer_ids.repartition(10)
def fetch_orders_for_partition(rows):
"""Called once per partition — batch API calls."""
import requests
results = []
for row in rows:
resp = requests.get(f"https://api.example.com/orders/{row.customer_id}",
headers={"Authorization": "Bearer TOKEN"}, timeout=30)
if resp.status_code == 200:
results.append(resp.json())
time.sleep(0.1) # Rate limiting within partition
return results
# Collect results (for small datasets)
# For large datasets, use foreachPartition to write directly to storage
API to Delta Pipeline
# Complete API → Bronze → Silver pipeline
import datetime
def ingest_api_to_bronze(api_url, headers, table_name):
"""Fetch API data and write to Bronze Delta table."""
records = fetch_all_pages(api_url, headers)
if not records:
print(f"No records from API. Skipping {table_name}.")
return 0
df = spark.createDataFrame([Row(**r) for r in records])
df = df.withColumn("_ingested_at", current_timestamp())
df = df.withColumn("_source", lit("api"))
df.write.format("delta").mode("append").saveAsTable(f"bronze.{table_name}")
row_count = df.count()
print(f"Loaded {row_count} rows to bronze.{table_name}")
return row_count
# Run
ingest_api_to_bronze(
"https://api.example.com/customers",
{"Authorization": "Bearer TOKEN"},
"api_customers"
)
Connection Security
Using Key Vault for Credentials
# NEVER hardcode passwords in notebooks!
# Use Databricks secret scopes backed by Azure Key Vault
jdbc_password = dbutils.secrets.get(scope="keyvault-scope", key="sql-password")
api_token = dbutils.secrets.get(scope="keyvault-scope", key="api-token")
df = (spark.read.format("jdbc")
.option("url", jdbc_url)
.option("user", "admin")
.option("password", jdbc_password) # From Key Vault, not hardcoded
.option("driver", driver)
.option("dbtable", "SalesLT.Customer")
.load()
)
JDBC with Managed Identity
# Azure SQL with Managed Identity (no password needed!)
jdbc_url = (
"jdbc:sqlserver://myserver.database.windows.net:1433;"
"database=mydb;"
"encrypt=true;"
"authentication=ActiveDirectoryMSI"
)
df = (spark.read.format("jdbc")
.option("url", jdbc_url)
.option("dbtable", "SalesLT.Customer")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
)
# No user/password needed — Databricks authenticates with its Managed Identity
Production Ingestion Patterns
Pattern 1: Full Load from Database
def full_load_table(table_name, schema_name, partition_col=None, num_partitions=10):
"""Full load a table from SQL to Delta Bronze."""
options = {
"url": jdbc_url, "user": user, "password": password,
"driver": driver, "dbtable": f"{schema_name}.{table_name}",
"fetchsize": "10000"
}
if partition_col:
bounds = spark.read.format("jdbc").options(**options).option(
"dbtable", f"(SELECT MIN({partition_col}) mn, MAX({partition_col}) mx FROM {schema_name}.{table_name}) t"
).load().first()
options.update({
"partitionColumn": partition_col,
"lowerBound": str(bounds["mn"]),
"upperBound": str(bounds["mx"]),
"numPartitions": str(num_partitions)
})
df = spark.read.format("jdbc").options(**options).load()
df.write.format("delta").mode("overwrite").saveAsTable(f"bronze.{table_name}")
return df.count()
Pattern 2: Incremental Load with Watermark
def incremental_load(table_name, schema_name, watermark_col):
"""Load only rows newer than the last watermark."""
# Get the last loaded watermark from audit table
try:
last_wm = spark.sql(f"""
SELECT MAX(watermark_value) as wm
FROM audit.load_watermarks
WHERE table_name = '{table_name}'
""").first()["wm"]
except:
last_wm = "1900-01-01" # First run
query = f"""
(SELECT * FROM {schema_name}.{table_name}
WHERE {watermark_col} > '{last_wm}') incremental_data
"""
df = spark.read.format("jdbc").options(**jdbc_options).option("dbtable", query).load()
if df.count() > 0:
df.write.format("delta").mode("append").saveAsTable(f"bronze.{table_name}")
new_wm = df.agg({watermark_col: "max"}).first()[0]
spark.sql(f"INSERT INTO audit.load_watermarks VALUES ('{table_name}', '{new_wm}', current_timestamp())")
return df.count()
Pattern 3: API to Bronze to Silver
def api_to_bronze_to_silver(api_url, headers, bronze_table, silver_table, clean_fn):
"""End-to-end: API → Bronze (raw) → Silver (cleaned)."""
# Bronze: raw API data
records = fetch_all_pages(api_url, headers)
raw_df = spark.createDataFrame([Row(**r) for r in records])
raw_df = raw_df.withColumn("_ingested_at", current_timestamp())
raw_df.write.format("delta").mode("append").saveAsTable(f"bronze.{bronze_table}")
# Silver: cleaned data
clean_df = clean_fn(raw_df) # Apply cleaning function
clean_df.write.format("delta").mode("overwrite").saveAsTable(f"silver.{silver_table}")
print(f"Bronze: {raw_df.count()} rows | Silver: {clean_df.count()} rows")
Common Mistakes
- Single-threaded JDBC reads on large tables — always use parallel reads (partitionColumn, numPartitions) for tables over 100K rows. A single connection can be 10x slower than 10 parallel connections.
- Hardcoding database credentials — use Databricks secret scopes backed by Azure Key Vault. Passwords in notebooks get committed to Git and are visible to anyone with workspace access.
- Not pushing filters to the database — reading the full table and filtering in Spark transfers unnecessary data. Use a subquery in
dbtableto filter at the source:(SELECT * FROM table WHERE date > '2026-01-01') t. - Not handling API pagination — most APIs return 100 or 1000 records per page. Reading just the first page misses the rest of your data. Always loop until
has_moreis false or the result is empty. - Choosing a skewed partition column — if 90% of IDs are between 1 and 100 and 10% are between 100 and 10M, your partitions will be heavily skewed. Check the distribution before choosing lowerBound and upperBound.
Interview Questions
Q: How do you optimize JDBC reads for large tables in PySpark?
A: Use parallel reads by specifying partitionColumn (a numeric column like ID), lowerBound, upperBound, and numPartitions. Spark generates N parallel queries, each reading a range of the partition column. Also increase fetchsize (e.g., 10000) to reduce network round trips. Push filters to the database using a subquery in dbtable.
Q: How do you handle REST API pagination in a data pipeline?
A: Loop through pages: send a request with page number and page size, collect the results, check if more pages exist (via has_more, next_url, or empty results), and continue until all data is fetched. Add retry with exponential backoff for rate limiting (HTTP 429). Convert the collected records to a Spark DataFrame and write to Delta.
Q: Why should you never hardcode database credentials in notebooks?
A: Notebooks are often committed to Git (Databricks Repos) and visible to other workspace users. Hardcoded passwords are a security breach waiting to happen. Use Databricks secret scopes backed by Azure Key Vault or AWS Secrets Manager. Access secrets via dbutils.secrets.get(). For Azure SQL, prefer Managed Identity authentication, which requires no password at all.
Wrapping Up
Reading from databases and APIs is the starting point of every data pipeline. Master parallel JDBC reads for databases (10x performance with numPartitions), paginated API fetching for REST services, and always secure credentials with Key Vault or Managed Identity. These ingestion patterns feed your Bronze layer — the foundation of your entire Lakehouse.
Related posts: – Connecting Databricks to Azure SQL – Secret Scopes & Key Vault – Connecting to Blob/ADLS – How Real Companies Receive Data
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.