TickAtlas
Most Popular ~15 min setup

Python Integration

Build trading bots, data pipelines, and analysis tools with the TickAtlas Python client. Supports synchronous and async workflows with built-in retry logic and caching.

Installation

Install the requests library (or use the manual approach below):

bash
pip install requests

For async support, also install aiohttp:

bash
pip install aiohttp

Quick Start

python
import os
from tickatlas import TickAtlasClient

# Initialize with API key from environment
client = TickAtlasClient(os.environ["CLAW_API_KEY"])

# Get RSI for EURUSD
rsi = client.indicator("EURUSD", "RSI_14", "H1")
print(f"RSI: {rsi['value']} | Signal: {rsi['signal']}")

# Get all 42 indicators at once
all_indicators = client.indicators("EURUSD", "H1")
for name, data in all_indicators["indicators"].items():
    print(f"  {name}: {data['value']}")

# AI-powered market summary
summary = client.summary("XAUUSD", "H1")
print(f"Bias: {summary['bias']} | Confidence: {summary['confidence']}")
print(f"Narrative: {summary['narrative']}")

# Screen for oversold symbols
oversold = client.screener("RSI_14", timeframe="H1", max_val=30)
for hit in oversold["results"]:
    print(f"  {hit['symbol']}: RSI = {hit['value']}")

Full Client Class

A production-ready client with automatic retries, rate limit handling, and all endpoints covered:

python
import requests
import time
from functools import lru_cache

class TickAtlasClient:
    """Python client for the TickAtlas Financial Data API."""

    BASE_URL = "https://tickatlas.com/v1"

    def __init__(self, api_key: str, max_retries: int = 3):
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": api_key})
        self.max_retries = max_retries

    def _request(self, endpoint: str, params: dict) -> dict:
        """Make a request with automatic retry and error handling."""
        url = f"{self.BASE_URL}/{endpoint}"
        for attempt in range(self.max_retries):
            try:
                resp = self.session.get(url, params=params, timeout=10)
                if resp.status_code == 429:
                    wait = int(resp.headers.get("Retry-After", 2))
                    time.sleep(wait)
                    continue
                resp.raise_for_status()
                return resp.json()
            except requests.exceptions.RequestException as e:
                if attempt == self.max_retries - 1:
                    raise
                time.sleep(2 ** attempt)
        return {}

    def indicator(self, symbol: str, indicator: str, timeframe: str = "H1") -> dict:
        return self._request("indicator", {
            "symbol": symbol, "indicator": indicator, "timeframe": timeframe
        })

    def indicators(self, symbol: str, timeframe: str = "H1") -> dict:
        return self._request("indicators", {
            "symbol": symbol, "timeframe": timeframe
        })

    def summary(self, symbol: str, timeframe: str = "H1") -> dict:
        return self._request("summary", {
            "symbol": symbol, "timeframe": timeframe
        })

    def quote(self, symbol: str) -> dict:
        return self._request("quote", {"symbol": symbol})

    def screener(self, indicator: str, timeframe: str = "H1",
                 min_val: float = None, max_val: float = None) -> dict:
        params = {"indicator": indicator, "timeframe": timeframe}
        if min_val is not None:
            params["min"] = min_val
        if max_val is not None:
            params["max"] = max_val
        return self._request("screener", params)

    def multi(self, symbols: list, indicators: list, timeframe: str = "H1") -> dict:
        return self._request("multi", {
            "symbols": ",".join(symbols),
            "indicators": ",".join(indicators),
            "timeframe": timeframe
        })

    def ohlc(self, symbol: str, timeframe: str = "H1", bars: int = 100) -> dict:
        return self._request("ohlc", {
            "symbol": symbol, "timeframe": timeframe, "bars": bars
        })

    def heatmap(self, timeframe: str = "H1") -> dict:
        return self._request("heatmap", {"timeframe": timeframe})

    def calendar(self) -> dict:
        return self._request("calendar", {})

Async Support with aiohttp

For high-throughput applications, use the async client to fetch multiple symbols concurrently:

python
import aiohttp
import asyncio

class AsyncTickAtlasClient:
    BASE_URL = "https://tickatlas.com/v1"

    def __init__(self, api_key: str):
        self.api_key = api_key
        self._session = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                headers={"X-API-Key": self.api_key}
            )
        return self._session

    async def _request(self, endpoint: str, params: dict) -> dict:
        session = await self._get_session()
        async with session.get(
            f"{self.BASE_URL}/{endpoint}", params=params, timeout=aiohttp.ClientTimeout(total=10)
        ) as resp:
            resp.raise_for_status()
            return await resp.json()

    async def indicator(self, symbol: str, indicator: str, tf: str = "H1") -> dict:
        return await self._request("indicator", {
            "symbol": symbol, "indicator": indicator, "timeframe": tf
        })

    async def close(self):
        if self._session and not self._session.closed:
            await self._session.close()

# Usage
async def main():
    client = AsyncTickAtlasClient("YOUR_API_KEY")
    try:
        # Fetch multiple indicators concurrently
        tasks = [
            client.indicator("EURUSD", "RSI_14"),
            client.indicator("GBPUSD", "RSI_14"),
            client.indicator("XAUUSD", "RSI_14"),
        ]
        results = await asyncio.gather(*tasks)
        for r in results:
            print(f"{r['symbol']}: RSI = {r['value']}")
    finally:
        await client.close()

asyncio.run(main())

Response Caching

Reduce API calls and stay within rate limits by caching responses. Adjust TTL based on timeframe:

python
import json
import hashlib
import redis

redis_client = redis.Redis(host="localhost", port=6379, db=0)

def cached_indicator(client, symbol, indicator, timeframe, ttl=30):
    """Cache indicator values in Redis with a configurable TTL."""
    cache_key = f"claw:{symbol}:{indicator}:{timeframe}"
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    data = client.indicator(symbol, indicator, timeframe)
    redis_client.setex(cache_key, ttl, json.dumps(data))
    return data

# Real-time quotes get a short TTL
quote = cached_indicator(client, "EURUSD", "RSI_14", "M1", ttl=5)

# Higher timeframes can be cached longer
daily = cached_indicator(client, "EURUSD", "RSI_14", "D1", ttl=300)

Error Handling

StatusMeaning
200Success
401Invalid or missing API key
403Endpoint not available on your plan
429Rate limit exceeded — retry after Retry-After header
500Server error — retry with backoff