How to Build a Reliable Data Pipeline for Algorithmic Trading
Design a production-grade data pipeline that collects, validates, stores, and serves financial data for algorithmic trading systems.
The Pipeline Architecture
A trading data pipeline has four stages: collection, validation, storage, and serving. Each stage can fail independently, and each failure mode requires a different response.
[Collection] → [Validation] → [Storage] → [Serving]
| | | |
API calls Sanity checks PostgreSQL Redis cache
Scheduling Gap detection TimescaleDB REST API
Retry logic Deduplication Backfill Rate limits
Stage 1: Collection
import requests
import schedule
import time
import logging
from datetime import datetime
logger = logging.getLogger("pipeline")
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://tickatlas.com/v1"
SYMBOLS = ["EURUSD", "GBPUSD", "USDJPY", "XAUUSD", "BTCUSD"]
TIMEFRAMES = ["M15", "H1", "H4", "D1"]
INDICATORS = ["RSI_14", "MACD", "EMA_50", "EMA_200", "ATR_14"]
def collect_snapshot():
"""Collect full indicator snapshot for all symbols."""
timestamp = datetime.utcnow()
records = []
for symbol in SYMBOLS:
for tf in TIMEFRAMES:
for indicator in INDICATORS:
try:
resp = requests.get(
f"{BASE_URL}/indicator",
headers={"X-API-Key": API_KEY},
params={
"symbol": symbol,
"indicator": indicator,
"timeframe": tf,
},
timeout=10,
)
resp.raise_for_status()
data = resp.json()["data"]
records.append({
"collected_at": timestamp.isoformat(),
"symbol": symbol,
"timeframe": tf,
"indicator": indicator,
"values": data["values"],
"signal": data.get("signal"),
"ohlcv": data.get("ohlcv"),
})
except Exception as e:
logger.error(f"Collection failed: {symbol}/{tf}/{indicator}: {e}")
logger.info(f"Collected {len(records)} records at {timestamp}")
return records
# Schedule collection
schedule.every(5).minutes.do(collect_snapshot) Stage 2: Validation
from dataclasses import dataclass
@dataclass
class ValidationResult:
valid: bool
errors: list[str]
def validate_record(record: dict) -> ValidationResult:
"""Validate a collected data record."""
errors = []
# Check required fields
for field in ["symbol", "timeframe", "indicator", "values"]:
if field not in record:
errors.append(f"Missing field: {field}")
# Validate indicator values are numeric
values = record.get("values", {})
for key, val in values.items():
if not isinstance(val, (int, float)):
errors.append(f"Non-numeric value for {key}: {val}")
if isinstance(val, float) and (val != val): # NaN check
errors.append(f"NaN value for {key}")
# Validate OHLCV sanity
ohlcv = record.get("ohlcv", {})
if ohlcv:
if ohlcv.get("high", 0) < ohlcv.get("low", 0):
errors.append("High is less than low")
if ohlcv.get("close", 0) <= 0:
errors.append("Close price is zero or negative")
# Validate RSI range
if "rsi" in values:
rsi = values["rsi"]
if not (0 <= rsi <= 100):
errors.append(f"RSI out of range: {rsi}")
return ValidationResult(valid=len(errors) == 0, errors=errors)
# Usage
for record in records:
result = validate_record(record)
if not result.valid:
logger.warning(f"Invalid record: {record['symbol']}: {result.errors}") Stage 3: Storage
import psycopg2
import json
def store_records(records: list[dict], conn_string: str):
"""Batch insert validated records into PostgreSQL."""
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
insert_sql = """
INSERT INTO indicator_snapshots
(collected_at, symbol, timeframe, indicator, values, signal, ohlcv)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (symbol, timeframe, indicator, collected_at)
DO NOTHING
"""
batch = []
for record in records:
result = validate_record(record)
if result.valid:
batch.append((
record["collected_at"],
record["symbol"],
record["timeframe"],
record["indicator"],
json.dumps(record["values"]),
record.get("signal"),
json.dumps(record.get("ohlcv", {})),
))
cur.executemany(insert_sql, batch)
conn.commit()
logger.info(f"Stored {len(batch)} records ({len(records) - len(batch)} invalid)")
cur.close()
conn.close() Stage 4: Serving via Cache
import redis
r = redis.Redis(decode_responses=True)
def update_cache(records: list[dict]):
"""Push latest values to Redis for fast access."""
pipe = r.pipeline()
for record in records:
key = f"latest:{record['symbol']}:{record['timeframe']}:{record['indicator']}"
pipe.setex(key, 600, json.dumps({
"values": record["values"],
"signal": record.get("signal"),
"ohlcv": record.get("ohlcv"),
"collected_at": record["collected_at"],
}))
pipe.execute()
def get_latest(symbol: str, timeframe: str, indicator: str) -> dict:
"""Fast retrieval from cache."""
key = f"latest:{symbol}:{timeframe}:{indicator}"
data = r.get(key)
return json.loads(data) if data else None Gap Detection
def detect_gaps(symbol: str, timeframe: str, expected_interval_minutes: int):
"""Check for missing data points in the collection history."""
conn = psycopg2.connect(CONN_STRING)
cur = conn.cursor()
cur.execute("""
SELECT collected_at FROM indicator_snapshots
WHERE symbol = %s AND timeframe = %s AND indicator = 'RSI_14'
ORDER BY collected_at DESC LIMIT 100
""", (symbol, timeframe))
timestamps = [row[0] for row in cur.fetchall()]
gaps = []
for i in range(len(timestamps) - 1):
diff = (timestamps[i] - timestamps[i + 1]).total_seconds() / 60
if diff > expected_interval_minutes * 2:
gaps.append({
"from": timestamps[i + 1].isoformat(),
"to": timestamps[i].isoformat(),
"gap_minutes": int(diff),
})
cur.close()
conn.close()
return gaps Related Reading
Try this with live data
Every account gets $2.50 in free PAYG credits. No card required — paste your API key and run the code above against live broker data.
Keep reading
All articles- Tutorial 11 min read
24/7 Crypto Monitoring: Building Always-On Analysis Systems
Build a monitoring system that watches crypto markets around the clock, detects significant moves, and sends alerts when conditions match your criteria.
March 28, 2026
- Tutorial 12 min read
How to Build an AI Market Analyst That Runs 24/7
Build a production-ready AI market analyst that monitors forex and crypto markets around the clock, generates daily briefings, and alerts you to opportunities via Telegram.
March 28, 2026
- Tutorial 10 min read
Using ATR for Dynamic Stop-Loss Placement
Learn how to use Average True Range (ATR) to set volatility-adjusted stop losses that adapt to market conditions, with full code examples.
March 28, 2026