---
title: "Building Production-Ready Data Pipelines with Python"
description: "Learn to build robust data pipelines in Python with Airflow, scheduling, error handling, and monitoring. Complete production deployment guide."
date: "2026-07-03"
category: "python"
tags: ["python", "data-pipeline", "airflow", "etl", "production", "monitoring"]
---
Data pipelines that work in development often fail in production. This guide teaches you to build resilient, observable data pipelines that handle failures gracefully and scale with your data volume.
A production-grade data pipeline system with Apache Airflow orchestration, built-in error handling, retry logic, monitoring dashboards, and automated alerting. You'll learn patterns used by data engineering teams at scale.
Fragile pipelines cause data delays, broken dashboards, and lost revenue. Production-ready pipelines provide:
data-pipeline/
├── dags/
│ ├── __init__.py
│ ├── etl_dag.py
│ └── data_quality_dag.py
├── plugins/
│ ├── __init__.py
│ ├── custom_operators.py
│ └── hooks.py
├── config/
│ └── pipelines.yaml
├── tests/
│ └── test_dags.py
└── docker-compose.yml
# dags/etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging
logger = logging.getLogger(__name__)
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
def extract_data(**context):
"""Extract data from source API."""
import requests
url = "https://api.source.com/data"
response = requests.get(url, timeout=30)
response.raise_for_status()
data = response.json()
df = pd.DataFrame(data["records"])
# Store in XCom for next task
context["ti"].xcom_push(key="raw_data", value=df.to_json())
logger.info(f"Extracted {len(df)} records")
return len(df)
def transform_data(**context):
"""Apply transformations and data quality checks."""
ti = context["ti"]
raw_json = ti.xcom_pull(task_ids="extract", key="raw_data")
df = pd.read_json(raw_json)
# Remove duplicates
df = df.drop_duplicates(subset=["id"])
# Handle missing values
df["amount"] = df["amount"].fillna(0)
df["timestamp"] = pd.to_datetime(df["timestamp"])
# Validate data quality
assert len(df) > 0, "Empty dataset after transformation"
assert df["amount"].min() >= 0, "Negative amounts found"
ti.xcom_push(key="transformed_data", value=df.to_json())
logger.info(f"Transformed to {len(df)} clean records")
return len(df)
def load_data(**context):
"""Load data into target database."""
ti = context["ti"]
transformed_json = ti.xcom_pull(task_ids="transform", key="transformed_data")
df = pd.read_json(transformed_json)
pg_hook = PostgresHook(postgres_conn_id="data_warehouse")
engine = pg_hook.get_sqlalchemy_engine()
df.to_sql(
"processed_data",
engine,
if_exists="append",
index=False,
chunksize=1000
)
logger.info(f"Loaded {len(df)} records to warehouse")
with DAG(
"etl_pipeline",
default_args=default_args,
description="Production ETL pipeline",
schedule_interval="0 */6 * * *", # Every 6 hours
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["production", "etl"],
) as dag:
with TaskGroup("extract_transform_load") as etl_group:
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
load = PythonOperator(
task_id="load",
python_callable=load_data,
)
extract >> transform >> load
notify = BashOperator(
task_id="notify_success",
bash_command='echo "Pipeline completed successfully at $(date)"',
)
etl_group >> notify
# plugins/custom_operators.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import subprocess
import json
import time
class DataQualityOperator(BaseOperator):
@apply_defaults
def __init__(self, sql_checks: list[str], redshift_conn_id: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sql_checks = sql_checks
self.redshift_conn_id = redshift_conn_id
def execute(self, context):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
failed_checks = []
for check in self.sql_checks:
result = hook.get_first(check)
if not result or result[0] == 0:
failed_checks.append(check)
self.log.error(f"Data quality check failed: {check}")
if failed_checks:
raise AirflowException(
f"Data quality checks failed: {len(failed_checks)} checks failed"
)
self.log.info("All data quality checks passed")
class FileProcessorOperator(BaseOperator):
@apply_defaults
def __init__(self, input_pattern: str, output_path: str, **kwargs):
super().__init__(**kwargs)
self.input_pattern = input_pattern
self.output_path = output_path
def execute(self, context):
import glob
import pandas as pd
files = glob.glob(self.input_pattern)
self.log.info(f"Processing {len(files)} files")
dfs = []
for file in files:
try:
df = pd.read_csv(file)
dfs.append(df)
except Exception as e:
self.log.warning(f"Failed to process {file}: {e}")
if dfs:
combined = pd.concat(dfs, ignore_index=True)
combined.to_csv(self.output_path, index=False)
self.log.info(f"Combined {len(dfs)} files into {self.output_path}")
else:
raise AirflowException("No files processed successfully")
# plugins/error_handlers.py
from airflow.models import Variable
import requests
import logging
logger = logging.getLogger(__name__)
class AlertHandler:
def __init__(self):
self.slack_webhook = Variable.get("slack_webhook_url", default_var="")
self.pagerduty_key = Variable.get("pagerduty_key", default_var="")
def send_slack_alert(self, message: str, severity: str = "warning"):
if not self.slack_webhook:
return
color = {"info": "#36a64f", "warning": "#ff9900", "critical": "#ff0000"}
payload = {
"attachments": [{
"color": color.get(severity, "#ff9900"),
"title": f"Pipeline Alert - {severity.upper()}",
"text": message,
"ts": time.time()
}]
}
try:
requests.post(self.slack_webhook, json=payload, timeout=10)
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
def on_retry_callback(self, context):
self.send_slack_alert(
f"Task {context['task_instance'].task_id} retrying "
f"(attempt {context['ti'].try_number})",
severity="info"
)
def on_failure_callback(self, context):
self.send_slack_alert(
f"Task {context['task_instance'].task_id} failed "
f"after {context['ti'].try_number} attempts",
severity="critical"
)
# plugins/monitoring.py
from prometheus_client import Counter, Histogram, Gauge
import time
TASK_DURATION = Histogram(
"etl_task_duration_seconds",
"Time spent on ETL tasks",
["dag_id", "task_id"],
buckets=[10, 30, 60, 120, 300, 600]
)
TASK_STATUS = Counter(
"etl_task_status_total",
"ETL task completion status",
["dag_id", "task_id", "status"]
)
RECORDS_PROCESSED = Counter(
"etl_records_processed_total",
"Total records processed",
["dag_id", "source"]
)
class PipelineMonitor:
@staticmethod
def record_task_start(dag_id: str, task_id: str):
TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="started").inc()
@staticmethod
def record_task_complete(dag_id: str, task_id: str, duration: float):
TASK_DURATION.labels(dag_id=dag_id, task_id=task_id).observe(duration)
TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="completed").inc()
@staticmethod
def record_task_failure(dag_id: str, task_id: str):
TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="failed").inc()
@staticmethod
def record_records(dag_id: str, source: str, count: int):
RECORDS_PROCESSED.labels(dag_id=dag_id, source=source).inc(count)
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-webserver:
image: apache/airflow:2.8.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
ports: ["8080:8080"]
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
airflow-scheduler:
image: apache/airflow:2.8.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
command: scheduler
Ready to use these tools? Browse our collection of tested, production-ready Python scripts:
🔗 Browse Products: [Anna's Digital Products](https://petroleum-board-hawaii-lol.trycloudflare.com)
All products include:
---
Don't want to build it yourself? We have production-ready versions of these tools at [https://petroleum-board-hawaii-lol.trycloudflare.com](https://petroleum-board-hawaii-lol.trycloudflare.com).
What you get:
[Browse the collection →](https://petroleum-board-hawaii-lol.trycloudflare.com)
Browse 120+ Python tools with crypto payments and instant delivery.
Browse Products →