Data Pipeline Overview
Airflow-based pipeline that pulls securities data from yfinance and stores it in PostgreSQL with TimescaleDB.
Architecture
graph TB
subgraph "Airflow DAGs"
H[yfinance_historical_parallel.py]
D[yfinance_daily_parallel.py]
end
subgraph "YfinancePipeline"
S1[scrape_sp500_tickers]
S2[scrape_russell3000_tickers]
S3[scrape_nasdaq_tickers]
V[validate_ticker]
SD[scrape_date_range]
VO[validate_ohlcv]
M[scrape_metadata]
end
subgraph "YfinanceClient"
IS[insert_security]
IO[insert_ohlcv]
IM[insert_metadata]
GT[get_tickers]
end
subgraph "Database"
SEC[(securities table)]
OHLCV[(ohlcv_data hypertable)]
META[(stock_metadata)]
end
H --> S1 & S2 & S3
S1 & S2 & S3 --> V
V --> IS
IS --> SEC
SEC --> SD
SD --> VO
VO --> IO
IO --> OHLCV
OHLCV --> M
M --> IM
IM --> META
D --> GT
GT --> SD
Architecture Components
Three main layers: DAGs orchestrate workflows, Pipeline scrapes/validates data, Client handles database operations
The pipeline has three main components:
| Component | Purpose | Key Features |
|---|---|---|
| DAGs | Workflow orchestration | Airflow .expand() for parallelization, max 20 concurrent tasks |
| Pipeline | Data scraping & validation | yfinance integration, Great Expectations checks |
| Client | Database operations | Bulk inserts with execute_values, upsert patterns |
Key Features
| Feature | Implementation | Benefit |
|---|---|---|
| Parallel Execution | Airflow .expand() with 20 concurrent tasks |
Process multiple tickers simultaneously |
| Bulk Inserts | psycopg2.extras.execute_values with 1000-row batches |
10-100x faster than individual inserts |
| Data Validation | 18 Great Expectations checks | Catch bad data before database insertion |
| Error Handling | Raise exceptions instead of error dicts | Airflow handles retries automatically |
DAG Workflows
Manual trigger for initial setup or backfills:
- Scrape ticker lists from Wikipedia, iShares, NASDAQ
- Validate each ticker with test download
- Register valid tickers in
securitiestable - Download OHLCV data for date range
- Validate OHLCV with Great Expectations
- Bulk insert to
ohlcv_datahypertable - Download and insert metadata
Auto-runs weekdays at 9:30 PM UTC:
- Get tickers from
securitiestable - Download latest OHLCV data
- Validate and insert
- Update metadata
Configuration
| Variable | Example | Purpose |
|---|---|---|
SEC_MASTER_DB_URL |
postgresql://user:pass@localhost:5432/sec_master_dev |
Database connection string |
Deployment
# Start Airflow and PostgreSQL
docker compose -f infrastructure/docker-compose.data-pipeline.yml up -d
# Access Airflow UI
# http://localhost:8080 (admin/admin)
Performance
| Metric | Value | Purpose |
|---|---|---|
| Batch size | 1000 rows | Bulk insert optimization |
| Parallelism | 20 tasks | Max concurrent ticker processing |
| Retry strategy | 2-3 attempts | Exponential backoff for transient failures |
| Rate limiting | max_active_tis_per_dag |
Avoid API throttling |