← Back to Blog

Building Production-Ready Data Pipelines with Python

Python Data Pipeline Production · 1059 words

---

title: "Building Production-Ready Data Pipelines with Python"

description: "Learn to build robust data pipelines in Python with Airflow, scheduling, error handling, and monitoring. Complete production deployment guide."

date: "2026-07-03"

category: "python"

tags: ["python", "data-pipeline", "airflow", "etl", "production", "monitoring"]

---

Data pipelines that work in development often fail in production. This guide teaches you to build resilient, observable data pipelines that handle failures gracefully and scale with your data volume.

What You'll Build

A production-grade data pipeline system with Apache Airflow orchestration, built-in error handling, retry logic, monitoring dashboards, and automated alerting. You'll learn patterns used by data engineering teams at scale.

Why Production Pipelines Matter

Fragile pipelines cause data delays, broken dashboards, and lost revenue. Production-ready pipelines provide:

Full Tutorial

Step 1: Project Structure


data-pipeline/
├── dags/
│   ├── __init__.py
│   ├── etl_dag.py
│   └── data_quality_dag.py
├── plugins/
│   ├── __init__.py
│   ├── custom_operators.py
│   └── hooks.py
├── config/
│   └── pipelines.yaml
├── tests/
│   └── test_dags.py
└── docker-compose.yml

Step 2: Airflow DAG Definition


# dags/etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.utils.task_group import TaskGroup
import pandas as pd
import logging

logger = logging.getLogger(__name__)

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
}

def extract_data(**context):
    """Extract data from source API."""
    import requests

    url = "https://api.source.com/data"
    response = requests.get(url, timeout=30)
    response.raise_for_status()

    data = response.json()
    df = pd.DataFrame(data["records"])

    # Store in XCom for next task
    context["ti"].xcom_push(key="raw_data", value=df.to_json())

    logger.info(f"Extracted {len(df)} records")
    return len(df)

def transform_data(**context):
    """Apply transformations and data quality checks."""
    ti = context["ti"]
    raw_json = ti.xcom_pull(task_ids="extract", key="raw_data")
    df = pd.read_json(raw_json)

    # Remove duplicates
    df = df.drop_duplicates(subset=["id"])

    # Handle missing values
    df["amount"] = df["amount"].fillna(0)
    df["timestamp"] = pd.to_datetime(df["timestamp"])

    # Validate data quality
    assert len(df) > 0, "Empty dataset after transformation"
    assert df["amount"].min() >= 0, "Negative amounts found"

    ti.xcom_push(key="transformed_data", value=df.to_json())
    logger.info(f"Transformed to {len(df)} clean records")
    return len(df)

def load_data(**context):
    """Load data into target database."""
    ti = context["ti"]
    transformed_json = ti.xcom_pull(task_ids="transform", key="transformed_data")
    df = pd.read_json(transformed_json)

    pg_hook = PostgresHook(postgres_conn_id="data_warehouse")
    engine = pg_hook.get_sqlalchemy_engine()

    df.to_sql(
        "processed_data",
        engine,
        if_exists="append",
        index=False,
        chunksize=1000
    )

    logger.info(f"Loaded {len(df)} records to warehouse")

with DAG(
    "etl_pipeline",
    default_args=default_args,
    description="Production ETL pipeline",
    schedule_interval="0 */6 * * *",  # Every 6 hours
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["production", "etl"],
) as dag:

    with TaskGroup("extract_transform_load") as etl_group:
        extract = PythonOperator(
            task_id="extract",
            python_callable=extract_data,
        )

        transform = PythonOperator(
            task_id="transform",
            python_callable=transform_data,
        )

        load = PythonOperator(
            task_id="load",
            python_callable=load_data,
        )

        extract >> transform >> load

    notify = BashOperator(
        task_id="notify_success",
        bash_command='echo "Pipeline completed successfully at $(date)"',
    )

    etl_group >> notify

Step 3: Custom Operator for Complex Tasks


# plugins/custom_operators.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
import subprocess
import json
import time

class DataQualityOperator(BaseOperator):
    @apply_defaults
    def __init__(self, sql_checks: list[str], redshift_conn_id: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.sql_checks = sql_checks
        self.redshift_conn_id = redshift_conn_id

    def execute(self, context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        failed_checks = []

        for check in self.sql_checks:
            result = hook.get_first(check)
            if not result or result[0] == 0:
                failed_checks.append(check)
                self.log.error(f"Data quality check failed: {check}")

        if failed_checks:
            raise AirflowException(
                f"Data quality checks failed: {len(failed_checks)} checks failed"
            )

        self.log.info("All data quality checks passed")

class FileProcessorOperator(BaseOperator):
    @apply_defaults
    def __init__(self, input_pattern: str, output_path: str, **kwargs):
        super().__init__(**kwargs)
        self.input_pattern = input_pattern
        self.output_path = output_path

    def execute(self, context):
        import glob
        import pandas as pd

        files = glob.glob(self.input_pattern)
        self.log.info(f"Processing {len(files)} files")

        dfs = []
        for file in files:
            try:
                df = pd.read_csv(file)
                dfs.append(df)
            except Exception as e:
                self.log.warning(f"Failed to process {file}: {e}")

        if dfs:
            combined = pd.concat(dfs, ignore_index=True)
            combined.to_csv(self.output_path, index=False)
            self.log.info(f"Combined {len(dfs)} files into {self.output_path}")
        else:
            raise AirflowException("No files processed successfully")

Step 4: Error Handling Patterns


# plugins/error_handlers.py
from airflow.models import Variable
import requests
import logging

logger = logging.getLogger(__name__)

class AlertHandler:
    def __init__(self):
        self.slack_webhook = Variable.get("slack_webhook_url", default_var="")
        self.pagerduty_key = Variable.get("pagerduty_key", default_var="")

    def send_slack_alert(self, message: str, severity: str = "warning"):
        if not self.slack_webhook:
            return

        color = {"info": "#36a64f", "warning": "#ff9900", "critical": "#ff0000"}
        payload = {
            "attachments": [{
                "color": color.get(severity, "#ff9900"),
                "title": f"Pipeline Alert - {severity.upper()}",
                "text": message,
                "ts": time.time()
            }]
        }
        try:
            requests.post(self.slack_webhook, json=payload, timeout=10)
        except Exception as e:
            logger.error(f"Failed to send Slack alert: {e}")

    def on_retry_callback(self, context):
        self.send_slack_alert(
            f"Task {context['task_instance'].task_id} retrying "
            f"(attempt {context['ti'].try_number})",
            severity="info"
        )

    def on_failure_callback(self, context):
        self.send_slack_alert(
            f"Task {context['task_instance'].task_id} failed "
            f"after {context['ti'].try_number} attempts",
            severity="critical"
        )

Step 5: Monitoring Configuration


# plugins/monitoring.py
from prometheus_client import Counter, Histogram, Gauge
import time

TASK_DURATION = Histogram(
    "etl_task_duration_seconds",
    "Time spent on ETL tasks",
    ["dag_id", "task_id"],
    buckets=[10, 30, 60, 120, 300, 600]
)

TASK_STATUS = Counter(
    "etl_task_status_total",
    "ETL task completion status",
    ["dag_id", "task_id", "status"]
)

RECORDS_PROCESSED = Counter(
    "etl_records_processed_total",
    "Total records processed",
    ["dag_id", "source"]
)

class PipelineMonitor:
    @staticmethod
    def record_task_start(dag_id: str, task_id: str):
        TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="started").inc()

    @staticmethod
    def record_task_complete(dag_id: str, task_id: str, duration: float):
        TASK_DURATION.labels(dag_id=dag_id, task_id=task_id).observe(duration)
        TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="completed").inc()

    @staticmethod
    def record_task_failure(dag_id: str, task_id: str):
        TASK_STATUS.labels(dag_id=dag_id, task_id=task_id, status="failed").inc()

    @staticmethod
    def record_records(dag_id: str, source: str, count: int):
        RECORDS_PROCESSED.labels(dag_id=dag_id, source=source).inc(count)

Step 6: Docker Deployment


# docker-compose.yml
version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  airflow-webserver:
    image: apache/airflow:2.8.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
    ports: ["8080:8080"]
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins

  airflow-scheduler:
    image: apache/airflow:2.8.0
    depends_on: [postgres]
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins
    command: scheduler

Pipeline Monitoring Checklist

Get the Code

Ready to use these tools? Browse our collection of tested, production-ready Python scripts:

🔗 Browse Products: [Anna's Digital Products](https://petroleum-board-hawaii-lol.trycloudflare.com)

All products include:

---

Get the Production-Ready Version

Don't want to build it yourself? We have production-ready versions of these tools at [https://petroleum-board-hawaii-lol.trycloudflare.com](https://petroleum-board-hawaii-lol.trycloudflare.com).

What you get:

[Browse the collection →](https://petroleum-board-hawaii-lol.trycloudflare.com)

🛒 Ready to deploy?

Browse 120+ Python tools with crypto payments and instant delivery.

Browse Products →