TickAtlas
Tutorial 13 min read · March 28, 2026

How to Build a Reliable Data Pipeline for Algorithmic Trading

Design a production-grade data pipeline that collects, validates, stores, and serves financial data for algorithmic trading systems.

CG
By the TickAtlas team

The Pipeline Architecture

A trading data pipeline has four stages: collection, validation, storage, and serving. Each stage can fail independently, and each failure mode requires a different response.

[Collection] → [Validation] → [Storage] → [Serving]
     |               |              |            |
  API calls     Sanity checks   PostgreSQL   Redis cache
  Scheduling    Gap detection   TimescaleDB  REST API
  Retry logic   Deduplication   Backfill     Rate limits

Stage 1: Collection

python
import requests
import schedule
import time
import logging
from datetime import datetime

logger = logging.getLogger("pipeline")
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://tickatlas.com/v1"

SYMBOLS = ["EURUSD", "GBPUSD", "USDJPY", "XAUUSD", "BTCUSD"]
TIMEFRAMES = ["M15", "H1", "H4", "D1"]
INDICATORS = ["RSI_14", "MACD", "EMA_50", "EMA_200", "ATR_14"]

def collect_snapshot():
    """Collect full indicator snapshot for all symbols."""
    timestamp = datetime.utcnow()
    records = []

    for symbol in SYMBOLS:
        for tf in TIMEFRAMES:
            for indicator in INDICATORS:
                try:
                    resp = requests.get(
                        f"{BASE_URL}/indicator",
                        headers={"X-API-Key": API_KEY},
                        params={
                            "symbol": symbol,
                            "indicator": indicator,
                            "timeframe": tf,
                        },
                        timeout=10,
                    )
                    resp.raise_for_status()
                    data = resp.json()["data"]

                    records.append({
                        "collected_at": timestamp.isoformat(),
                        "symbol": symbol,
                        "timeframe": tf,
                        "indicator": indicator,
                        "values": data["values"],
                        "signal": data.get("signal"),
                        "ohlcv": data.get("ohlcv"),
                    })
                except Exception as e:
                    logger.error(f"Collection failed: {symbol}/{tf}/{indicator}: {e}")

    logger.info(f"Collected {len(records)} records at {timestamp}")
    return records

# Schedule collection
schedule.every(5).minutes.do(collect_snapshot)

Stage 2: Validation

python
from dataclasses import dataclass

@dataclass
class ValidationResult:
    valid: bool
    errors: list[str]

def validate_record(record: dict) -> ValidationResult:
    """Validate a collected data record."""
    errors = []

    # Check required fields
    for field in ["symbol", "timeframe", "indicator", "values"]:
        if field not in record:
            errors.append(f"Missing field: {field}")

    # Validate indicator values are numeric
    values = record.get("values", {})
    for key, val in values.items():
        if not isinstance(val, (int, float)):
            errors.append(f"Non-numeric value for {key}: {val}")
        if isinstance(val, float) and (val != val):  # NaN check
            errors.append(f"NaN value for {key}")

    # Validate OHLCV sanity
    ohlcv = record.get("ohlcv", {})
    if ohlcv:
        if ohlcv.get("high", 0) < ohlcv.get("low", 0):
            errors.append("High is less than low")
        if ohlcv.get("close", 0) <= 0:
            errors.append("Close price is zero or negative")

    # Validate RSI range
    if "rsi" in values:
        rsi = values["rsi"]
        if not (0 <= rsi <= 100):
            errors.append(f"RSI out of range: {rsi}")

    return ValidationResult(valid=len(errors) == 0, errors=errors)

# Usage
for record in records:
    result = validate_record(record)
    if not result.valid:
        logger.warning(f"Invalid record: {record['symbol']}: {result.errors}")

Stage 3: Storage

python
import psycopg2
import json

def store_records(records: list[dict], conn_string: str):
    """Batch insert validated records into PostgreSQL."""
    conn = psycopg2.connect(conn_string)
    cur = conn.cursor()

    insert_sql = """
        INSERT INTO indicator_snapshots
        (collected_at, symbol, timeframe, indicator, values, signal, ohlcv)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (symbol, timeframe, indicator, collected_at)
        DO NOTHING
    """

    batch = []
    for record in records:
        result = validate_record(record)
        if result.valid:
            batch.append((
                record["collected_at"],
                record["symbol"],
                record["timeframe"],
                record["indicator"],
                json.dumps(record["values"]),
                record.get("signal"),
                json.dumps(record.get("ohlcv", {})),
            ))

    cur.executemany(insert_sql, batch)
    conn.commit()
    logger.info(f"Stored {len(batch)} records ({len(records) - len(batch)} invalid)")
    cur.close()
    conn.close()

Stage 4: Serving via Cache

python
import redis

r = redis.Redis(decode_responses=True)

def update_cache(records: list[dict]):
    """Push latest values to Redis for fast access."""
    pipe = r.pipeline()
    for record in records:
        key = f"latest:{record['symbol']}:{record['timeframe']}:{record['indicator']}"
        pipe.setex(key, 600, json.dumps({
            "values": record["values"],
            "signal": record.get("signal"),
            "ohlcv": record.get("ohlcv"),
            "collected_at": record["collected_at"],
        }))
    pipe.execute()

def get_latest(symbol: str, timeframe: str, indicator: str) -> dict:
    """Fast retrieval from cache."""
    key = f"latest:{symbol}:{timeframe}:{indicator}"
    data = r.get(key)
    return json.loads(data) if data else None

Gap Detection

python
def detect_gaps(symbol: str, timeframe: str, expected_interval_minutes: int):
    """Check for missing data points in the collection history."""
    conn = psycopg2.connect(CONN_STRING)
    cur = conn.cursor()
    cur.execute("""
        SELECT collected_at FROM indicator_snapshots
        WHERE symbol = %s AND timeframe = %s AND indicator = 'RSI_14'
        ORDER BY collected_at DESC LIMIT 100
    """, (symbol, timeframe))

    timestamps = [row[0] for row in cur.fetchall()]
    gaps = []

    for i in range(len(timestamps) - 1):
        diff = (timestamps[i] - timestamps[i + 1]).total_seconds() / 60
        if diff > expected_interval_minutes * 2:
            gaps.append({
                "from": timestamps[i + 1].isoformat(),
                "to": timestamps[i].isoformat(),
                "gap_minutes": int(diff),
            })

    cur.close()
    conn.close()
    return gaps

Related Reading

Try this with live data

Every account gets $2.50 in free PAYG credits. No card required — paste your API key and run the code above against live broker data.