Python Integration
Build trading bots, data pipelines, and analysis tools with the TickAtlas Python client. Supports synchronous and async workflows with built-in retry logic and caching.
Installation
Install the requests library (or use the manual approach below):
pip install requests For async support, also install aiohttp:
pip install aiohttp Quick Start
import os
from tickatlas import TickAtlasClient
# Initialize with API key from environment
client = TickAtlasClient(os.environ["CLAW_API_KEY"])
# Get RSI for EURUSD
rsi = client.indicator("EURUSD", "RSI_14", "H1")
print(f"RSI: {rsi['value']} | Signal: {rsi['signal']}")
# Get all 42 indicators at once
all_indicators = client.indicators("EURUSD", "H1")
for name, data in all_indicators["indicators"].items():
print(f" {name}: {data['value']}")
# AI-powered market summary
summary = client.summary("XAUUSD", "H1")
print(f"Bias: {summary['bias']} | Confidence: {summary['confidence']}")
print(f"Narrative: {summary['narrative']}")
# Screen for oversold symbols
oversold = client.screener("RSI_14", timeframe="H1", max_val=30)
for hit in oversold["results"]:
print(f" {hit['symbol']}: RSI = {hit['value']}") Full Client Class
A production-ready client with automatic retries, rate limit handling, and all endpoints covered:
import requests
import time
from functools import lru_cache
class TickAtlasClient:
"""Python client for the TickAtlas Financial Data API."""
BASE_URL = "https://tickatlas.com/v1"
def __init__(self, api_key: str, max_retries: int = 3):
self.session = requests.Session()
self.session.headers.update({"X-API-Key": api_key})
self.max_retries = max_retries
def _request(self, endpoint: str, params: dict) -> dict:
"""Make a request with automatic retry and error handling."""
url = f"{self.BASE_URL}/{endpoint}"
for attempt in range(self.max_retries):
try:
resp = self.session.get(url, params=params, timeout=10)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 2))
time.sleep(wait)
continue
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
if attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt)
return {}
def indicator(self, symbol: str, indicator: str, timeframe: str = "H1") -> dict:
return self._request("indicator", {
"symbol": symbol, "indicator": indicator, "timeframe": timeframe
})
def indicators(self, symbol: str, timeframe: str = "H1") -> dict:
return self._request("indicators", {
"symbol": symbol, "timeframe": timeframe
})
def summary(self, symbol: str, timeframe: str = "H1") -> dict:
return self._request("summary", {
"symbol": symbol, "timeframe": timeframe
})
def quote(self, symbol: str) -> dict:
return self._request("quote", {"symbol": symbol})
def screener(self, indicator: str, timeframe: str = "H1",
min_val: float = None, max_val: float = None) -> dict:
params = {"indicator": indicator, "timeframe": timeframe}
if min_val is not None:
params["min"] = min_val
if max_val is not None:
params["max"] = max_val
return self._request("screener", params)
def multi(self, symbols: list, indicators: list, timeframe: str = "H1") -> dict:
return self._request("multi", {
"symbols": ",".join(symbols),
"indicators": ",".join(indicators),
"timeframe": timeframe
})
def ohlc(self, symbol: str, timeframe: str = "H1", bars: int = 100) -> dict:
return self._request("ohlc", {
"symbol": symbol, "timeframe": timeframe, "bars": bars
})
def heatmap(self, timeframe: str = "H1") -> dict:
return self._request("heatmap", {"timeframe": timeframe})
def calendar(self) -> dict:
return self._request("calendar", {}) Async Support with aiohttp
For high-throughput applications, use the async client to fetch multiple symbols concurrently:
import aiohttp
import asyncio
class AsyncTickAtlasClient:
BASE_URL = "https://tickatlas.com/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self._session = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={"X-API-Key": self.api_key}
)
return self._session
async def _request(self, endpoint: str, params: dict) -> dict:
session = await self._get_session()
async with session.get(
f"{self.BASE_URL}/{endpoint}", params=params, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
resp.raise_for_status()
return await resp.json()
async def indicator(self, symbol: str, indicator: str, tf: str = "H1") -> dict:
return await self._request("indicator", {
"symbol": symbol, "indicator": indicator, "timeframe": tf
})
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
# Usage
async def main():
client = AsyncTickAtlasClient("YOUR_API_KEY")
try:
# Fetch multiple indicators concurrently
tasks = [
client.indicator("EURUSD", "RSI_14"),
client.indicator("GBPUSD", "RSI_14"),
client.indicator("XAUUSD", "RSI_14"),
]
results = await asyncio.gather(*tasks)
for r in results:
print(f"{r['symbol']}: RSI = {r['value']}")
finally:
await client.close()
asyncio.run(main()) Response Caching
Reduce API calls and stay within rate limits by caching responses. Adjust TTL based on timeframe:
import json
import hashlib
import redis
redis_client = redis.Redis(host="localhost", port=6379, db=0)
def cached_indicator(client, symbol, indicator, timeframe, ttl=30):
"""Cache indicator values in Redis with a configurable TTL."""
cache_key = f"claw:{symbol}:{indicator}:{timeframe}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
data = client.indicator(symbol, indicator, timeframe)
redis_client.setex(cache_key, ttl, json.dumps(data))
return data
# Real-time quotes get a short TTL
quote = cached_indicator(client, "EURUSD", "RSI_14", "M1", ttl=5)
# Higher timeframes can be cached longer
daily = cached_indicator(client, "EURUSD", "RSI_14", "D1", ttl=300) Error Handling
| Status | Meaning |
|---|---|
| 200 | Success |
| 401 | Invalid or missing API key |
| 403 | Endpoint not available on your plan |
| 429 | Rate limit exceeded — retry after Retry-After header |
| 500 | Server error — retry with backoff |