Python for Data Engineers: The Essential Skills You Actually Use Every Day
Python is the most in-demand skill in data engineering job descriptions. But the Python a data engineer uses looks very different from what a web developer or data scientist writes. You do not need to master Django or TensorFlow. You need to read files, transform data, call APIs, automate tasks, and interact with cloud services.
This post covers the Python skills that matter for data engineering — not theory, but the code patterns you will write every week on the job.
Table of Contents
- Why Python for Data Engineering?
- Reading and Writing Files (CSV, JSON, Parquet)
- Working with Pandas DataFrames
- Data Cleaning with Pandas
- Working with Databases (SQLAlchemy and pyodbc)
- Calling REST APIs with requests
- Working with AWS (boto3) and Azure (azure SDK)
- Environment Variables and Configuration
- Logging Best Practices
- Error Handling Patterns
- Working with Dates and Times
- File System Operations
- Virtual Environments and Dependency Management
- Common Python Patterns in Data Pipelines
- Interview Questions
- Wrapping Up
Why Python for Data Engineering?
Every data engineering tool has a Python interface:
- Apache Spark: PySpark
- Azure Data Factory: Python SDK (azure-mgmt-datafactory)
- AWS Glue: Python shell jobs and PySpark
- Databricks: Python notebooks
- Airflow: DAGs are Python files
- dbt: Jinja + Python models
- Cloud SDKs: boto3 (AWS), azure-storage-blob (Azure), google-cloud (GCP)
Python is the glue language that connects everything. Even if your main pipeline tool is ADF or Spark, you will write Python for automation, testing, data validation, API integration, and operational scripts.
Reading and Writing Files (CSV, JSON, Parquet)
CSV Files
import csv
# Read CSV
with open('data.csv', 'r') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['name'], row['salary'])
# Write CSV
data = [
{'name': 'Alice', 'salary': 70000},
{'name': 'Bob', 'salary': 60000},
]
with open('output.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['name', 'salary'])
writer.writeheader()
writer.writerows(data)
JSON Files
import json
# Read JSON
with open('config.json', 'r') as f:
config = json.load(f)
print(config['database']['host'])
# Write JSON
data = {'pipeline': 'daily_etl', 'tables': ['Customer', 'Address']}
with open('output.json', 'w') as f:
json.dump(data, f, indent=2)
# Parse JSON string (from API response)
response_text = '{"status": "success", "count": 42}'
parsed = json.loads(response_text)
print(parsed['count']) # 42
Parquet Files (with pandas)
import pandas as pd
# Read Parquet
df = pd.read_parquet('data.parquet')
# Write Parquet
df.to_parquet('output.parquet', engine='pyarrow', compression='snappy')
# Read specific columns only (efficient -- Parquet supports column pruning)
df = pd.read_parquet('data.parquet', columns=['name', 'salary'])
Working with Pandas DataFrames
Pandas is the most used Python library in data engineering for data manipulation.
Creating DataFrames
import pandas as pd
# From a dictionary
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Carol'],
'department': ['Engineering', 'Sales', 'Engineering'],
'salary': [70000, 60000, 85000]
})
# From a CSV file
df = pd.read_csv('employees.csv')
# From a SQL query
from sqlalchemy import create_engine
engine = create_engine('mssql+pyodbc://server/db?driver=ODBC+Driver+18+for+SQL+Server')
df = pd.read_sql('SELECT * FROM employees', engine)
Essential DataFrame Operations
# View data
df.head() # First 5 rows
df.shape # (rows, columns)
df.dtypes # Column data types
df.describe() # Summary statistics
df.info() # Column info + null counts
# Select columns
df['name'] # Single column (Series)
df[['name', 'salary']] # Multiple columns (DataFrame)
# Filter rows
df[df['salary'] > 70000] # Salary above 70K
df[df['department'] == 'Engineering'] # Engineering only
df[(df['salary'] > 60000) & (df['department'] == 'Sales')] # Combined
# Sort
df.sort_values('salary', ascending=False) # Highest salary first
df.sort_values(['department', 'salary'], ascending=[True, False])
# Group and aggregate
df.groupby('department')['salary'].mean() # Average salary per dept
df.groupby('department').agg(
avg_salary=('salary', 'mean'),
max_salary=('salary', 'max'),
emp_count=('name', 'count')
)
# Add new columns
df['bonus'] = df['salary'] * 0.10
df['full_name'] = df['first_name'] + ' ' + df['last_name']
df['salary_band'] = pd.cut(df['salary'], bins=[0, 60000, 80000, 100000],
labels=['Junior', 'Mid', 'Senior'])
# Rename columns
df.rename(columns={'name': 'employee_name', 'salary': 'annual_salary'}, inplace=True)
# Drop columns
df.drop(columns=['temp_column', 'unused_field'], inplace=True)
Merging DataFrames (SQL JOIN equivalent)
# INNER JOIN
merged = pd.merge(employees_df, departments_df,
left_on='department_id', right_on='id',
how='inner')
# LEFT JOIN
merged = pd.merge(employees_df, departments_df,
left_on='department_id', right_on='id',
how='left')
# Multiple join keys
merged = pd.merge(df1, df2, on=['year', 'month'], how='inner')
Data Cleaning with Pandas
Handling Missing Values
# Check for nulls
df.isnull().sum() # Count nulls per column
# Drop rows with any null
df.dropna(inplace=True)
# Drop rows where specific column is null
df.dropna(subset=['email'], inplace=True)
# Fill nulls with a value
df['salary'].fillna(0, inplace=True)
df['department'].fillna('Unknown', inplace=True)
# Fill with mean/median
df['salary'].fillna(df['salary'].median(), inplace=True)
Removing Duplicates
# Drop exact duplicate rows
df.drop_duplicates(inplace=True)
# Drop duplicates based on specific columns (keep first occurrence)
df.drop_duplicates(subset=['email'], keep='first', inplace=True)
# Drop duplicates keeping the last occurrence
df.drop_duplicates(subset=['customer_id'], keep='last', inplace=True)
Data Type Conversion
# Convert types
df['salary'] = df['salary'].astype(int)
df['hire_date'] = pd.to_datetime(df['hire_date'])
df['is_active'] = df['is_active'].astype(bool)
# Handle messy numeric data
df['revenue'] = pd.to_numeric(df['revenue'], errors='coerce') # Invalid values become NaN
String Cleaning
# Strip whitespace
df['name'] = df['name'].str.strip()
# Standardize case
df['email'] = df['email'].str.lower()
df['name'] = df['name'].str.title()
# Replace values
df['status'] = df['status'].replace({'Y': 'Active', 'N': 'Inactive'})
# Extract with regex
df['area_code'] = df['phone'].str.extract(r'(\d{3})-\d{3}-\d{4}')
Working with Databases (SQLAlchemy and pyodbc)
SQLAlchemy (Recommended)
from sqlalchemy import create_engine, text
# Connect to Azure SQL Database
engine = create_engine(
'mssql+pyodbc://username:password@server.database.windows.net/dbname'
'?driver=ODBC+Driver+18+for+SQL+Server'
'&encrypt=yes&TrustServerCertificate=no'
)
# Read data into DataFrame
df = pd.read_sql('SELECT * FROM employees WHERE salary > 50000', engine)
# Write DataFrame to SQL table
df.to_sql('employees_staging', engine, if_exists='replace', index=False)
# Execute raw SQL
with engine.connect() as conn:
conn.execute(text("UPDATE employees SET status = 'Active' WHERE id = 1"))
conn.commit()
Connection Strings for Common Databases
# Azure SQL Database
'mssql+pyodbc://user:pass@server.database.windows.net/db?driver=ODBC+Driver+18+for+SQL+Server'
# PostgreSQL
'postgresql://user:pass@host:5432/dbname'
# MySQL
'mysql+pymysql://user:pass@host:3306/dbname'
# SQLite (local file)
'sqlite:///local_database.db'
# Snowflake
'snowflake://user:pass@account/db/schema?warehouse=compute_wh'
Calling REST APIs with requests
import requests
# GET request
response = requests.get('https://api.example.com/users')
data = response.json()
# GET with parameters
response = requests.get(
'https://api.example.com/users',
params={'page': 1, 'limit': 100},
headers={'Authorization': 'Bearer YOUR_TOKEN'}
)
# POST request (create resource)
payload = {'name': 'Alice', 'email': 'alice@example.com'}
response = requests.post(
'https://api.example.com/users',
json=payload,
headers={'Content-Type': 'application/json'}
)
# Error handling
response = requests.get('https://api.example.com/data')
response.raise_for_status() # Raises exception for 4xx/5xx status codes
# Retry pattern with session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503])
session.mount('https://', HTTPAdapter(max_retries=retries))
response = session.get('https://api.example.com/data')
Working with AWS (boto3) and Azure (azure SDK)
AWS S3 with boto3
import boto3
s3 = boto3.client('s3')
# Upload file
s3.upload_file('local_file.csv', 'my-bucket', 'data/file.csv')
# Download file
s3.download_file('my-bucket', 'data/file.csv', 'local_file.csv')
# List objects
response = s3.list_objects_v2(Bucket='my-bucket', Prefix='data/')
for obj in response.get('Contents', []):
print(obj['Key'], obj['Size'])
# Read CSV directly from S3 into pandas
import io
obj = s3.get_object(Bucket='my-bucket', Key='data/file.csv')
df = pd.read_csv(io.BytesIO(obj['Body'].read()))
Azure Blob Storage
from azure.storage.blob import BlobServiceClient
connection_string = "DefaultEndpointsProtocol=https;AccountName=..."
blob_service = BlobServiceClient.from_connection_string(connection_string)
container = blob_service.get_container_client('datalake')
# Upload
with open('data.parquet', 'rb') as f:
container.upload_blob('bronze/data.parquet', f, overwrite=True)
# Download
blob = container.download_blob('bronze/data.parquet')
with open('local_data.parquet', 'wb') as f:
f.write(blob.readall())
# List blobs
for blob in container.list_blobs(name_starts_with='bronze/'):
print(blob.name, blob.size, blob.last_modified)
Environment Variables and Configuration
Never hardcode credentials. Use environment variables:
import os
# Read environment variables
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_PASSWORD = os.environ['DB_PASSWORD'] # Raises KeyError if not set
API_KEY = os.environ.get('API_KEY')
# Set in terminal before running:
# export DB_HOST=server.database.windows.net
# export DB_PASSWORD=secret123
Using .env files with python-dotenv
# .env file (never commit to Git!)
# DB_HOST=server.database.windows.net
# DB_PASSWORD=secret123
from dotenv import load_dotenv
load_dotenv() # Loads .env file into environment
DB_HOST = os.environ.get('DB_HOST')
Configuration file pattern
# config.py
import os
class Config:
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_PORT = int(os.environ.get('DB_PORT', 5432))
DB_NAME = os.environ.get('DB_NAME', 'mydb')
DEBUG = os.environ.get('DEBUG', 'false').lower() == 'true'
Logging Best Practices
Never use print() in production scripts. Use the logging module:
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler() # Also print to console
]
)
logger = logging.getLogger(__name__)
# Usage
logger.info('Pipeline started')
logger.info(f'Processing {len(df)} rows')
logger.warning(f'Missing values found in column: email')
logger.error(f'Failed to connect to database: {str(e)}')
logger.debug('Row details: %s', row) # Only shown if level=DEBUG
Log levels
DEBUG -- Detailed diagnostic information
INFO -- Confirmation that things are working
WARNING -- Something unexpected but not critical
ERROR -- Something failed but the program continues
CRITICAL -- Program cannot continue
Error Handling Patterns
Basic try/except
try:
df = pd.read_csv('data.csv')
logger.info(f'Loaded {len(df)} rows')
except FileNotFoundError:
logger.error('Data file not found')
raise
except pd.errors.EmptyDataError:
logger.warning('Data file is empty')
df = pd.DataFrame()
Retry pattern for network operations
import time
def fetch_with_retry(url, max_retries=3, delay=5):
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.warning(f'Attempt {attempt + 1} failed: {e}')
if attempt < max_retries - 1:
time.sleep(delay * (attempt + 1)) # Exponential backoff
else:
logger.error(f'All {max_retries} attempts failed')
raise
Context managers for cleanup
from contextlib import contextmanager
@contextmanager
def database_connection(connection_string):
engine = create_engine(connection_string)
try:
yield engine
logger.info('Database operation completed')
except Exception as e:
logger.error(f'Database error: {e}')
raise
finally:
engine.dispose()
logger.info('Connection closed')
# Usage
with database_connection('postgresql://...') as engine:
df = pd.read_sql('SELECT * FROM users', engine)
Working with Dates and Times
from datetime import datetime, timedelta, date
# Current date/time
now = datetime.now()
today = date.today()
utc_now = datetime.utcnow()
# Format dates
now.strftime('%Y-%m-%d') # '2026-04-07'
now.strftime('%Y-%m-%d %H:%M:%S') # '2026-04-07 14:30:00'
now.strftime('%Y%m%d') # '20260407'
# Parse date strings
parsed = datetime.strptime('2026-04-07', '%Y-%m-%d')
parsed = datetime.strptime('07/04/2026 14:30', '%d/%m/%Y %H:%M')
# Date arithmetic
yesterday = today - timedelta(days=1)
last_week = today - timedelta(weeks=1)
six_months_ago = today - timedelta(days=180)
# Pandas date operations
df['date'] = pd.to_datetime(df['date_string'])
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day_of_week'] = df['date'].dt.day_name()
df['is_weekend'] = df['date'].dt.dayofweek >= 5
File System Operations
import os
import glob
import shutil
from pathlib import Path
# List files in directory
files = os.listdir('/data/input/')
csv_files = glob.glob('/data/input/*.csv')
parquet_files = glob.glob('/data/input/**/*.parquet', recursive=True)
# Check if file/directory exists
os.path.exists('/data/input/file.csv')
os.path.isfile('/data/input/file.csv')
os.path.isdir('/data/input/')
# Create directories
os.makedirs('/data/output/2026/04', exist_ok=True)
# Move/copy/delete files
shutil.copy('source.csv', 'destination.csv')
shutil.move('old_location.csv', 'new_location.csv')
os.remove('file_to_delete.csv')
# Get file info
size = os.path.getsize('data.csv') # bytes
modified_time = os.path.getmtime('data.csv') # timestamp
# Using pathlib (modern Python)
path = Path('/data/input/file.csv')
path.exists()
path.stem # 'file'
path.suffix # '.csv'
path.parent # Path('/data/input')
Virtual Environments and Dependency Management
Create a virtual environment
# Create
python -m venv venv
# Activate (Mac/Linux)
source venv/bin/activate
# Activate (Windows)
venv\Scriptsctivate
# Install packages
pip install pandas pyarrow requests sqlalchemy boto3
# Save dependencies
pip freeze > requirements.txt
# Install from requirements
pip install -r requirements.txt
# Deactivate
deactivate
requirements.txt example for data engineering
pandas==2.2.0
pyarrow==15.0.0
requests==2.31.0
sqlalchemy==2.0.25
pyodbc==5.1.0
boto3==1.34.0
azure-storage-blob==12.19.0
python-dotenv==1.0.0
Common Python Patterns in Data Pipelines
ETL Script Template
import logging
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def extract(source_engine, query):
logger.info('Extracting data...')
df = pd.read_sql(query, source_engine)
logger.info(f'Extracted {len(df)} rows')
return df
def transform(df):
logger.info('Transforming data...')
df = df.drop_duplicates()
df['name'] = df['name'].str.strip().str.title()
df['load_date'] = datetime.utcnow()
logger.info(f'Transformed to {len(df)} rows')
return df
def load(df, target_engine, table_name):
logger.info(f'Loading {len(df)} rows to {table_name}...')
df.to_sql(table_name, target_engine, if_exists='append', index=False)
logger.info('Load complete')
def main():
try:
source = create_engine('postgresql://source_server/source_db')
target = create_engine('postgresql://target_server/target_db')
df = extract(source, 'SELECT * FROM customers')
df = transform(df)
load(df, target, 'customers_staging')
logger.info('Pipeline completed successfully')
except Exception as e:
logger.error(f'Pipeline failed: {e}')
raise
if __name__ == '__main__':
main()
Batch Processing Pattern
def process_in_batches(df, batch_size=10000):
total = len(df)
for start in range(0, total, batch_size):
batch = df.iloc[start:start + batch_size]
logger.info(f'Processing batch {start//batch_size + 1}: '
f'rows {start} to {min(start + batch_size, total)}')
yield batch
Configuration-Driven Pipeline
import json
# Load pipeline config
with open('pipeline_config.json') as f:
config = json.load(f)
# Process each table defined in config
for table in config['tables']:
logger.info(f"Processing: {table['schema']}.{table['name']}")
query = f"SELECT * FROM {table['schema']}.{table['name']}"
df = pd.read_sql(query, engine)
output_path = f"output/{table['name']}.parquet"
df.to_parquet(output_path)
logger.info(f"Written {len(df)} rows to {output_path}")
Interview Questions
Q: How do you read a Parquet file in Python?
A: Use pandas: df = pd.read_parquet('file.parquet'). You can also read specific columns for efficiency: pd.read_parquet('file.parquet', columns=['name', 'salary']). This leverages Parquet columnar pruning.
Q: How do you handle missing values in pandas?
A: Use df.isnull().sum() to check, df.dropna() to remove rows, df.fillna(value) to replace with a specific value, or df.fillna(df['col'].median()) to fill with statistical values.
Q: How do you connect Python to a SQL database?
A: Use SQLAlchemy with create_engine() for database-agnostic connections. Then use pd.read_sql() to read data into DataFrames and df.to_sql() to write DataFrames to tables.
Q: Why use environment variables instead of hardcoding credentials?
A: Security — hardcoded credentials in code get committed to Git and exposed. Environment variables keep secrets outside the codebase. Use .env files locally and cloud secret managers (Key Vault, AWS Secrets Manager) in production.
Q: What is the difference between json.load() and json.loads()?
A: json.load() reads from a file object. json.loads() parses a JSON string. Similarly, json.dump() writes to a file and json.dumps() converts to a string.
Q: How would you process a 10GB CSV file in Python?
A: Use pd.read_csv('file.csv', chunksize=100000) to read in chunks. Process each chunk individually and write results incrementally. For even larger files, switch to PySpark or Dask.
Wrapping Up
Python for data engineering is not about knowing every library. It is about mastering a core set of patterns that you use repeatedly: reading files, transforming DataFrames, calling APIs, connecting to databases, handling errors, and logging everything.
Build these patterns into your muscle memory, and you can tackle any data engineering task that comes your way.
Related posts: – Building a REST API with FastAPI on AWS Lambda – SQL Window Functions – Parquet vs CSV vs JSON
If this guide helped you level up your Python skills, share it with a fellow data engineer. Questions? Drop a comment below.
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.